Spring AOP:如何在具有 void return 类型的异步方法中重新抛出异常

Spring AOP: How to rethrow an exception in the async method with void return type

我有以下应用程序(与 Gradle + Spring Boot 相同的应用程序在这里 https://www.dropbox.com/s/vizr5joyhixmdca/demo.zip?dl=0):

Writer.java 包含一些在 @Async 注释的帮助下 运行 异步的代码。一种方法 returns void 和另一种方法 returns Future。根据文档,这两种变体都是允许的。

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.util.concurrent.Future;

@Component
@Async("customExecutor")
public class Writer {

    public void write() {
        System.out.println("Writing something");
        throw new RuntimeException("Writer exception");
    }

    public Future<Void> writeFuture() {
        System.out.println("Writing something with future");
        throw new RuntimeException("Writer exception with future");
    }
}

ErrorHandlingThreadPoolExecutor.java 是自定义执行器。与 ThreadPoolExecutor 的唯一区别是它的错误处理。 afterExecute 实现与方法的 javadoc 中建议的完全相同。所以这里的想法是当异常发生时打印 "[ERROR] " + ex

import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

@Component("customExecutor")
public class ErrorHandlingThreadPoolExecutor extends ThreadPoolExecutor {

    public ErrorHandlingThreadPoolExecutor() {
        super(1, 1, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        if (t == null && r instanceof Future<?>) {
            try {
                ((Future<?>) r).get();
            } catch (CancellationException ce) {
                t = ce;
            } catch (ExecutionException ee) {
                t = ee.getCause();
            } catch (InterruptedException ie) {
                Thread.currentThread().interrupt();
            }
        }
        if (t != null) {
            handleError(t);
        }
    }

    private void handleError(Throwable ex) {
        System.out.println("[ERROR] " + ex);
    }
}

Config.java开启异步处理+调度。它还按计划调用 writer.write

import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

@Configuration
@EnableScheduling
@EnableAsync
public class Config {

    private final Writer writer;

    public Config(Writer writer) {
        this.writer = writer;
    }

    @Scheduled(fixedRate = 1000)
    public void writeBySchedule() {
        writer.write();
//        writer.writeFuture();
    }
}

当我 运行 这个应用程序时,我看到以下输出:

Writing something
2020-07-14 21:16:33.791 ERROR 19860 --- [pool-1-thread-1] .a.i.SimpleAsyncUncaughtExceptionHandler : Unexpected exception occurred invoking async method: public void com.example.demo.Writer.write()

java.lang.RuntimeException: Writer exception
    at com.example.demo.Writer.write(Writer.java:14) ~[main/:na]
    at com.example.demo.Writer$$FastClassBySpringCGLIB$$cd00988d.invoke(<generated>) ~[main/:na]
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:771) ~[spring-aop-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:749) ~[spring-aop-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke[=15=](AsyncExecutionInterceptor.java:115) ~[spring-aop-5.2.7.RELEASE.jar:5.2.7.RELEASE]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_242]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[na:1.8.0_242]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[na:1.8.0_242]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_242]
...

同时,如果我评论 writer.write() 并取消评论 writer.writeFuture(),我会得到以下信息:

Writing something with future
[ERROR] java.lang.RuntimeException: Writer exception with future
...

后者是我试图通过 ErrorHandlingThreadPoolExecutor 实现的目标。但是我想保留我的方法 return void。 我发现我的异常没有到达自定义 ErrorHandlingThreadPoolExecutor.handleError() 方法的原因在这里:https://github.com/spring-projects/spring-framework/blob/master/spring-aop/src/main/java/org/springframework/aop/interceptor/AsyncExecutionAspectSupport.java#L308。此方法在我的自定义方法之前执行,似乎无法为 void 方法重新抛出异常。我知道 AsyncConfigurerSupport class 允许自定义异常处理,但异常仍然不会从 AsyncExecutionAspectSupport.handleError().

中逃脱

总而言之,如果它们将 void 声明为 return 类型,是否有任何方法可以将我的异常从异步执行的方法传播到 ErrorHandlingThreadPoolExecutor.handleError()?现在看来我可以在没有 @Async 的情况下直接使用执行程序,但是 @Async 有可能吗?如果不是,什么是“侵入性”较小的修复(更改和维护的代码较少)?我有很多异步方法 returning void.

更新:根据接受的答案,我提出了以下方面:

import org.apache.commons.lang3.StringUtils;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.util.Map;

@Component
@Aspect
public class ErrorHandlingAspect implements ApplicationListener<ContextRefreshedEvent> {

    public static final String DEFAULT_EXECUTOR_BEAN_NAME = "defaultExecutor";

    private Map<String, ErrorHandlingThreadPoolExecutor> errorHandlingExecutors;

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) {
        // initializing here because not all beans come if initialized in constructor
        this.errorHandlingExecutors = event.getApplicationContext()
                .getBeansOfType(ErrorHandlingThreadPoolExecutor.class);
    }

    @Pointcut(
            // where @Async is on class level
            "@within(org.springframework.scheduling.annotation.Async)"
                    // where @Async is on method level
                    + " || @annotation(org.springframework.scheduling.annotation.Async)")
    public void asyncMethods() {
    }

    @Around("asyncMethods()")
    public Object runWithErrorHandling(ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
        Async annotation = method.getAnnotation(Async.class);
        if (annotation == null) {
            annotation = method.getDeclaringClass().getAnnotation(Async.class);
        }
        if (annotation == null) {
            // shouldn't happen because of pointcut configuration, just for safety
            return joinPoint.proceed();
        }

        String asyncExecutorName = annotation.value();
        if (StringUtils.isEmpty(asyncExecutorName)) {
            asyncExecutorName = DEFAULT_EXECUTOR_BEAN_NAME;
        }

        ErrorHandlingThreadPoolExecutor asyncExecutor = errorHandlingExecutors.get(asyncExecutorName);
        if (asyncExecutor == null) {
            // can happen if the declared executor isn't extending ErrorHandlingThreadPoolExecutor
            // or if @Async uses the default executor which is either not registered as a bean at all
            // or not named DEFAULT_EXECUTOR_BEAN_NAME
            return joinPoint.proceed();
        }

        try {
            return joinPoint.proceed();
        } catch (Throwable throwable) {
            asyncExecutor.handleError(throwable);
            return null;
        }
    }
}

优点:

  1. 允许在不处理线程的情况下处理异步执行代码中的错误。
  2. 可以根据执行程序进行不同的错误处理。
  3. 可以包装方法 returning voidFuture<>

缺点:

  1. 无法处理调用线程中的错误(仅在被调用线程中)。
  2. 需要将默认执行器注册为 bean 并为其指定一个特定名称。
  3. 仅适用于 @Async 注释,不适用于使用 submit().
  4. 直接传递给执行程序的异步代码

如果你使用这样的方面,你可以摆脱执行器中的错误处理块,或者只使用普通的执行器并完全删除整个(不是真正起作用的)错误处理执行器。我做到了并且有效:

package com.example.demo;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;

@Component
@Aspect
public class ErrorHandlingAspect {
  // If necessary, narrow down the pointcut more here
  @Around("@within(org.springframework.scheduling.annotation.Async)")
  public Object advice(ProceedingJoinPoint joinPoint) {
    try {
      return joinPoint.proceed();
    }
    catch (Throwable throwable) {
      handleError(throwable);
      // Can also return empty future here for non-void methods
      return null;
    }
  }

  private void handleError(Throwable ex) {
    System.out.println("[ERROR] " + ex);
  }
}

当我删除 ErrorHandlingThreadPoolExecutor 时,将 Writer 上的注释更改为 @AsyncConfig.writeBySchedule,如下所示:

@Scheduled(fixedRate = 1000)
public void writeBySchedule() {
  writer.write();
  writer.writeFuture();
}

控制台日志如下所示:

  .   ____          _            __ _ _
 /\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v2.1.8.RELEASE)

2020-07-15 07:41:02.314  INFO 18672 --- [           main] com.example.demo.DemoApplication         : Starting DemoApplication on Xander-Ultrabook with PID 18672 (C:\Users\alexa\Documents\java-src\spring-aop-playground\target\classes started by alexa in C:\Users\alexa\Documents\java-src\spring-aop-playground)
(...)
2020-07-15 07:41:06.839  INFO 18672 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 'taskScheduler'
Writing something
Writing something with future
[ERROR] java.lang.RuntimeException: Writer exception
[ERROR] java.lang.RuntimeException: Writer exception with future
Writing something
[ERROR] java.lang.RuntimeException: Writer exception
Writing something with future
[ERROR] java.lang.RuntimeException: Writer exception with future
Writing something
Writing something with future
[ERROR] java.lang.RuntimeException: Writer exception
[ERROR] java.lang.RuntimeException: Writer exception with future
(...)

P.S.: 这是关于如何提出好问题的元主题,因此有点偏离主题。) 感谢您研究的示例性问题并且记录得很好,甚至包括下载 link 到 MCVE.

通常我必须首先要求 MCVE 或完整 类 因为问题作者不知道为什么会发生错误,仍然认为他们知道其他人需要他们的代码和配置的哪些部分以便回答同样的问题,这通常被证明是错误的。在大多数情况下,如果他们知道错误出在哪里,他们就可以自行修复,不是吗?然后我开始要求 MCVE,因为我无法调试散文或不连贯的代码片段集,这是一个乏味的过程,特别是如果 OP 开始与我争论为什么他们应该这样做,因为他们认为他们的问题很清楚。但如果他们是,我不会要求更多信息。

相反,你的问题很明确,我可以运行调试代码并尝试我的想法。 3分钟解决问题。我希望有更多问题像你一样。 :)