为所有人收集可抛物 CompletableFuture.allOf
Collect throwables for all CompletableFuture.allOf
疯狂的事情正在发生。我有一批并行异步任务,每个任务都可能抛出异常。我想执行所有这些并 收集 所有可能的异常并将其包装为只有一个异常。但看起来收集的 Throwable
对象可能会 deleted (!?),就像它被弱引用引用一样。最好举例说明。
例子
这是示例的完整源代码。我将把它分成逻辑部分,以更好地解释我正在尝试做的事情。如果您想尝试,只需将以下所有代码块合并到您的 IDE 中即可。
主要
我们调用 process()
并传递输入数据 - 将要处理的整数数组。我们检查异常,然后列出所有 收集的 异常。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class CollectExceptions {
public static void main(String[] args) throws Exception {
CompletableFuture<List<Integer>> futures =
process(Arrays.asList("1", "a", "b", "4"));
try {
futures.get();
} catch (ExecutionException e) {
BatchException batchException = (BatchException) e.getCause();
Stream.of(batchException.exceptions).forEach(System.out::println);
}
}
进程
首先,我们将输入 Iterable
转换为 CompletableFuture<Integer>[]
。这意味着对单个输入的每个工作都会传递给 supplyAsync
。由于我们要收集错误,所以我们使用 handle
方法。请注意,exeptionally()
和 whenComplete()
.
也会发生同样的事情
然后我们组成一个 CompletableFuture
所有输入期货。当那个完成时,即当所有输入都被处理时,我们决定应该抛出异常;如果收集到任何异常。
public static CompletableFuture<List<Integer>> process(Iterable<String> documents) {
final List<Throwable> throwables = new ArrayList<>();
CompletableFuture<Integer>[] allFuturesArray = StreamSupport
.stream(documents.spliterator(), false)
.map((document) -> CompletableFuture.supplyAsync(() -> workSync(document)).handle((list, t) -> {
if (t != null) {
throwables.add(t);
}
return list;
}))
.toArray(CompletableFuture[]::new);
CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(allFuturesArray);
return allDoneFuture
.thenApply(v -> Stream.of(allFuturesArray)
.map(CompletableFuture::join)
.collect(Collectors.<Integer>toList())
)
.whenComplete((list, throwable) -> {
if (throwables.size() > 0) {
throw new BatchException(throwables);
}
});
}
工作
这是工作方法。我们添加了一些延迟,否则您将看不到问题。还有BatchException
的定义,就是简单的存储输入异常。
public static Integer workSync(String string) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Integer.valueOf(string);
}
public static class BatchException extends RuntimeException {
public final Throwable[] exceptions;
public BatchException(Iterable<Throwable> exceptions) {
this.exceptions = StreamSupport
.stream(exceptions.spliterator(), false)
.toArray(Throwable[]::new);
}
}
}
执行和问题
程序行为不一致!当我从我的 IDE(在 DEBUG 模式下)运行它时,我看到有 2 个异常,但是 SOMETIMES 其中一个异常是 null
!!!
这是一个输出:
/Library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home/bin/java....
null
java.util.concurrent.CompletionException: java.lang.NumberFormatException: For input string: "a"
done
哇!什么鬼?我们怎么收集了null
???看起来里面正在进行一些不安全的工作。
"It looks that some UNSAFE work is being done inside."
是的...您的 throwables
列表访问不同步:)
疯狂的事情正在发生。我有一批并行异步任务,每个任务都可能抛出异常。我想执行所有这些并 收集 所有可能的异常并将其包装为只有一个异常。但看起来收集的 Throwable
对象可能会 deleted (!?),就像它被弱引用引用一样。最好举例说明。
例子
这是示例的完整源代码。我将把它分成逻辑部分,以更好地解释我正在尝试做的事情。如果您想尝试,只需将以下所有代码块合并到您的 IDE 中即可。
主要
我们调用 process()
并传递输入数据 - 将要处理的整数数组。我们检查异常,然后列出所有 收集的 异常。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class CollectExceptions {
public static void main(String[] args) throws Exception {
CompletableFuture<List<Integer>> futures =
process(Arrays.asList("1", "a", "b", "4"));
try {
futures.get();
} catch (ExecutionException e) {
BatchException batchException = (BatchException) e.getCause();
Stream.of(batchException.exceptions).forEach(System.out::println);
}
}
进程
首先,我们将输入 Iterable
转换为 CompletableFuture<Integer>[]
。这意味着对单个输入的每个工作都会传递给 supplyAsync
。由于我们要收集错误,所以我们使用 handle
方法。请注意,exeptionally()
和 whenComplete()
.
然后我们组成一个 CompletableFuture
所有输入期货。当那个完成时,即当所有输入都被处理时,我们决定应该抛出异常;如果收集到任何异常。
public static CompletableFuture<List<Integer>> process(Iterable<String> documents) {
final List<Throwable> throwables = new ArrayList<>();
CompletableFuture<Integer>[] allFuturesArray = StreamSupport
.stream(documents.spliterator(), false)
.map((document) -> CompletableFuture.supplyAsync(() -> workSync(document)).handle((list, t) -> {
if (t != null) {
throwables.add(t);
}
return list;
}))
.toArray(CompletableFuture[]::new);
CompletableFuture<Void> allDoneFuture = CompletableFuture.allOf(allFuturesArray);
return allDoneFuture
.thenApply(v -> Stream.of(allFuturesArray)
.map(CompletableFuture::join)
.collect(Collectors.<Integer>toList())
)
.whenComplete((list, throwable) -> {
if (throwables.size() > 0) {
throw new BatchException(throwables);
}
});
}
工作
这是工作方法。我们添加了一些延迟,否则您将看不到问题。还有BatchException
的定义,就是简单的存储输入异常。
public static Integer workSync(String string) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Integer.valueOf(string);
}
public static class BatchException extends RuntimeException {
public final Throwable[] exceptions;
public BatchException(Iterable<Throwable> exceptions) {
this.exceptions = StreamSupport
.stream(exceptions.spliterator(), false)
.toArray(Throwable[]::new);
}
}
}
执行和问题
程序行为不一致!当我从我的 IDE(在 DEBUG 模式下)运行它时,我看到有 2 个异常,但是 SOMETIMES 其中一个异常是 null
!!!
这是一个输出:
/Library/Java/JavaVirtualMachines/jdk1.8.0_20.jdk/Contents/Home/bin/java....
null
java.util.concurrent.CompletionException: java.lang.NumberFormatException: For input string: "a"
done
哇!什么鬼?我们怎么收集了null
???看起来里面正在进行一些不安全的工作。
"It looks that some UNSAFE work is being done inside."
是的...您的 throwables
列表访问不同步:)