如何从提交给执行者的已取消+中断的可调用项中获取异常?

How to get the exception from a cancelled+interrupted callable submitted to an executor?

考虑以下代码:

ExecutorService executor = Executors.newSingleThreadExecutor();
final CountDownLatch taskStarted = new CountDownLatch(1);
Future<String> future = executor.submit( new Callable<String>() {
    @Override
    public synchronized String call() throws Exception {
        try {
            taskStarted.countDown();
            this.wait( 60000 );
            return "foo";                   
        }
        catch( Exception iWantToGetThisExceptionOutside ) {
            iWantToGetThisExceptionOutside.printStackTrace();
            throw iWantToGetThisExceptionOutside;
        }
    }
});
assertTrue(taskStarted.await(60, TimeUnit.SECONDS));
try {
    future.get(500, TimeUnit.MILLISECONDS);
    fail("Timeout expected.");
} catch (ExecutionException | TimeoutException e) {
    e.printStackTrace();
}
future.cancel(true); //mayInterruptIfRunning
//how to get iWantToGetThisExceptionOutside here?

有没有办法在取消后在主线程中获取iWantToGetThisExceptionOutside?我必须创建自己的执行程序吗?

编辑: 只是为了说明没有 ExecutionException 被抛出,但是 TimeoutException 不包含任何原因。 iWantToGetThisExceptionOutside 是正常的 InterruptedException.

EDIT2: 一点澄清:任务相对简单。如果任务运行时间过长,我希望能够取消该任务。为此,我需要一个带超时的 get call ,它会在超时时抛出异常。我仍然欢迎在我的日志中添加一个堆栈跟踪条目,该条目显示 WHERE 任务已取消。为此,我需要 Callable.

之外的这个例外

您接住的 ExecutionException 应该在抛出时环绕在您的 iWant... Exception 周围。

您可以通过在主线程的 catch 语句中检查 ExecutionExceptionThrowable 来添加自定义逻辑:

// pseudo-code
if (e.getCause().[something, i.e. getMessage]) {
    // TODO something
}

备注

在您的 call 实施中,您正在 catch 并重新 throw 相同的 Exception,这没有意义。

注二

可调用函数的逻辑中推断超时并没有真正意义,尽管您始终可以通过编程方式计算从 [= 开始执行某些操作所需的时间17=] 到 call 结束。

超时和 TimeoutException 的全部意义在于调用者决定此延迟任务花费的时间太长。

为此,您在 catch 语句中捕获了 TimeoutException

如果您需要 "decorate" 您的 TimeoutException 而不是特定原因导致执行时间过长,您可以:

  • 以编程方式计算从 call 调用开始到 call 调用结束的时间,以及 throw 将由 ExecutionException(非常难看),
  • call 方法中有自己的每个 "sub-task" 的延迟执行,并且 throw 自定义 Exception 任何超时的

Future.get() 方法是获取从 call() 中抛出的任何异常的唯一方法。所以只需在 future.cancel(true) 之后添加另一个 future.get();

而不是依赖于 throw/catch,您可以从 Callable 中简单地作为一个对象携带异常(使用同步共享状态)。看起来很丑但是很管用。

    ExecutorService executor = Executors.newSingleThreadExecutor();
    final CountDownLatch taskStarted = new CountDownLatch(1);
    final CountDownLatch taskCompleted = new CountDownLatch(1);  // <- to sync on task completion
    final Exception[] wasSomethingWrong = new Exception[1];      // <- not thread safe, but works here  
    Future<String> future = executor.submit( new Callable<String>() {
        @Override
        public synchronized String call() throws Exception {
            try {
                taskStarted.countDown();
                this.wait( 60000 );
            }
            catch( Exception iWantToGetThisExceptionOutside ) {
                wasSomethingWrong[0] = iWantToGetThisExceptionOutside; // <-
            } finally {
                taskCompleted.countDown();  // <- 
            }
            return "foo";
        }
    });
    assertTrue(taskStarted.await(60, TimeUnit.SECONDS));
    try {
        future.get(500, TimeUnit.MILLISECONDS);
        fail("Timeout expected.");
    } catch (ExecutionException | TimeoutException e) {
        e.printStackTrace();
    }
    future.cancel(true); //mayInterruptIfRunning
    taskCompleted.await(60, TimeUnit.SECONDS);  // <- sync

    assertNotNull(wasSomethingWrong[0]);
    System.out.println(Arrays.toString(wasSomethingWrong[0].getStackTrace()));
    assertEquals(InterruptedException.class, wasSomethingWrong[0].getClass());  // <- PROFIT

您可以使用自定义的 FutureTask:

public class TracingFutureTask<T> extends FutureTask<T> {
    private Throwable trace;
    private boolean done;

    public TracingFutureTask(Callable<T> callable) {
        super(callable);
    }
    public TracingFutureTask(Runnable runnable, T result) {
        super(runnable, result);
    }

    @Override
    public void run() {
        try { super.run(); }
        finally { synchronized(this) { done=true; notifyAll(); }}
    }

    @Override
    protected void setException(Throwable t) {
        trace=t;
        super.setException(t);
    }
    public synchronized Throwable getException() throws InterruptedException {
        while(!done) wait();
        return trace;
    }

    public synchronized Throwable getException(long timeout)
                        throws InterruptedException, TimeoutException {

        for(long deadline = System.currentTimeMillis()+timeout, toWait=timeOut;
            !done; toWait = deadline-System.currentTimeMillis()) {

            if ( toWait <=0 ) throw new TimeoutException(
                              "Thread did not end in " + timeout + " milliseconds!" );
            wait(toWait);
        }
        return trace;
    }

    public static <V> TracingFutureTask<V> submit(Executor e, Callable<V> c) {
        TracingFutureTask<V> ft=new TracingFutureTask<>(c);
        e.execute(ft);
        return ft;
    }
    public static <V> TracingFutureTask<V> submit(Executor e, Runnable r, V v) {
        TracingFutureTask<V> ft=new TracingFutureTask<>(r, v);
        e.execute(ft);
        return ft;
    }
}

这会额外跟踪基础 class 的异常,但与基础 class 不同的是,即使作业已被取消,它也会记住它。这就是为什么 run() 方法和 getException() 之间有一个额外的同步,因为在取消情况下作业可以进入取消状态(这意味着“完成”)before 已经记录了异常,所以我们必须引入我们自己的 done 适当同步的状态。

它可以像这样使用:

ExecutorService executor = Executors.newSingleThreadExecutor();
TracingFutureTask<String> future=TracingFutureTask.submit(executor, new Callable<String>(){
    @Override
    public synchronized String call() throws Exception {
        this.wait( 60000 );
        return "foo";                   
    }
});
try {
    future.get(500, TimeUnit.MILLISECONDS);
    fail("Timeout expected.");
} catch (ExecutionException | TimeoutException e) {
    e.printStackTrace();
}
if(future.cancel(true)) {
    System.err.println("cancelled.");
    Throwable t = future.getException();
    if(t!=null) t.printStackTrace(System.err.append("cancellation caused "));
}

(源自您的示例代码)

java.util.concurrent.TimeoutException
    at java.util.concurrent.FutureTask.get(FutureTask.java:205)
    at so.TestCancel.main(TestCancel.java:69)
cancelled.
cancellation caused java.lang.InterruptedException
    at java.lang.Object.wait(Native Method)
    at so.TestCancel.call(TestCancel.java:64)
    at so.TestCancel.call(TestCancel.java:61)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at so.TracingFutureTask.run(TestCancel.java:33)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)