如何使用 CompletionStage 的集合很好地完成 allOf/AnyOf
How to nicely do allOf/AnyOf with Collections of CompletionStage
目前要使用 CompletionStage 的集合做一些简单的事情需要跳过几个丑陋的箍:
public static CompletionStage<String> translate(String foo) {
// just example code to reproduce
return CompletableFuture.completedFuture("translated " + foo);
}
public static CompletionStage<List<String>> translateAllAsync(List<String> input) {
List<CompletableFuture<String>> tFutures = input.stream()
.map(s -> translate(s)
.toCompletableFuture())
.collect(Collectors.toList()); // cannot use toArray because of generics Arrays creation :-(
return CompletableFuture.allOf(tFutures.toArray(new CompletableFuture<?>[0])) // not using size() on purpose, see comments
.thenApply(nil -> tFutures.stream()
.map(f -> f.join())
.map(s -> s.toUpperCase())
.collect(Collectors.toList()));
}
我要写的是:
public CompletionStage<List<String>> translateAllAsync(List<String> input) {
// allOf takes a collection< futures<X>>,
// and returns a future<collection<x>> for thenApply()
return XXXUtil.allOf(input.stream()
.map(s -> translate(s))
.collect(Collectors.toList()))
.thenApply(translations -> translations.stream()
.map(s -> s.toUpperCase())
.collect(Collectors.toList()));
}
关于 toCompletableFuture 以及转换为数组和连接的整个仪式是样板文件,分散了实际代码语义的注意力。
可能有一个版本的 allOf() 返回 Future<Collection<Future<X>>>
而不是 Future<Collection<X>>
在某些情况下也可能有用。
我可以尝试自己实现 XXXUtil,但我想知道是否已经有针对此问题和类似问题的成熟的 3rdparty 库(例如 Spotify 的 CompletableFutures)。如果是这样,我希望看到此类库的等效代码作为答案。
或者上面发布的原始代码可以用不同的方式写得更紧凑?
JUnit测试代码:
@Test
public void testTranslate() throws Exception {
List<String> list = translateAllAsync(Arrays.asList("foo", "bar")).toCompletableFuture().get();
Collections.sort(list);
assertEquals(list,
Arrays.asList("TRANSLATED BAR", "TRANSLATED FOO"));
}
我刚刚查看了 CompletableFuture.allOf
的源代码,发现它基本上创建了一次处理两个阶段的节点二叉树。我们可以轻松实现类似的逻辑,而无需显式使用 toCompletableFuture()
并一次性处理结果列表生成:
public static <T> CompletionStage<List<T>> allOf(
Stream<? extends CompletionStage<? extends T>> source) {
return allOf(source.collect(Collectors.toList()));
}
public static <T> CompletionStage<List<T>> allOf(
List<? extends CompletionStage<? extends T>> source) {
int size = source.size();
if(size == 0) return CompletableFuture.completedFuture(Collections.emptyList());
List<T> result = new ArrayList<>(Collections.nCopies(size, null));
return allOf(source, result, 0, size-1).thenApply(x -> result);
}
private static <T> CompletionStage<Void> allOf(
List<? extends CompletionStage<? extends T>> source,
List<T> result, int from, int to) {
if(from < to) {
int mid = (from+to)>>>1;
return allOf(source, result, from, mid)
.thenCombine(allOf(source, result, mid+1, to), (x,y)->x);
}
return source.get(from).thenAccept(t -> result.set(from, t));
}
就是这样。
您可以使用此解决方案将问题代码的逻辑实现为
public static CompletionStage<List<String>> translateAllAsync(List<String> input) {
return allOf(input.stream().map(s -> translate(s)))
.thenApply(list -> list.stream()
.map(s -> s.toUpperCase())
.collect(Collectors.toList()));
}
尽管使用
会更自然
public static CompletionStage<List<String>> translateAllAsync(List<String> input) {
return allOf(input.stream().map(s -> translate(s).thenApply(String::toUpperCase)));
}
请注意,此解决方案保持顺序,因此无需对测试用例中的结果进行排序:
@Test
public void testTranslate() throws Exception {
List<String> list = translateAllAsync(Arrays.asList("foo", "bar")).toCompletableFuture().get();
assertEquals(list, Arrays.asList("TRANSLATED FOO", "TRANSLATED BAR"));
}
目前要使用 CompletionStage 的集合做一些简单的事情需要跳过几个丑陋的箍:
public static CompletionStage<String> translate(String foo) {
// just example code to reproduce
return CompletableFuture.completedFuture("translated " + foo);
}
public static CompletionStage<List<String>> translateAllAsync(List<String> input) {
List<CompletableFuture<String>> tFutures = input.stream()
.map(s -> translate(s)
.toCompletableFuture())
.collect(Collectors.toList()); // cannot use toArray because of generics Arrays creation :-(
return CompletableFuture.allOf(tFutures.toArray(new CompletableFuture<?>[0])) // not using size() on purpose, see comments
.thenApply(nil -> tFutures.stream()
.map(f -> f.join())
.map(s -> s.toUpperCase())
.collect(Collectors.toList()));
}
我要写的是:
public CompletionStage<List<String>> translateAllAsync(List<String> input) {
// allOf takes a collection< futures<X>>,
// and returns a future<collection<x>> for thenApply()
return XXXUtil.allOf(input.stream()
.map(s -> translate(s))
.collect(Collectors.toList()))
.thenApply(translations -> translations.stream()
.map(s -> s.toUpperCase())
.collect(Collectors.toList()));
}
关于 toCompletableFuture 以及转换为数组和连接的整个仪式是样板文件,分散了实际代码语义的注意力。
可能有一个版本的 allOf() 返回 Future<Collection<Future<X>>>
而不是 Future<Collection<X>>
在某些情况下也可能有用。
我可以尝试自己实现 XXXUtil,但我想知道是否已经有针对此问题和类似问题的成熟的 3rdparty 库(例如 Spotify 的 CompletableFutures)。如果是这样,我希望看到此类库的等效代码作为答案。
或者上面发布的原始代码可以用不同的方式写得更紧凑?
JUnit测试代码:
@Test
public void testTranslate() throws Exception {
List<String> list = translateAllAsync(Arrays.asList("foo", "bar")).toCompletableFuture().get();
Collections.sort(list);
assertEquals(list,
Arrays.asList("TRANSLATED BAR", "TRANSLATED FOO"));
}
我刚刚查看了 CompletableFuture.allOf
的源代码,发现它基本上创建了一次处理两个阶段的节点二叉树。我们可以轻松实现类似的逻辑,而无需显式使用 toCompletableFuture()
并一次性处理结果列表生成:
public static <T> CompletionStage<List<T>> allOf(
Stream<? extends CompletionStage<? extends T>> source) {
return allOf(source.collect(Collectors.toList()));
}
public static <T> CompletionStage<List<T>> allOf(
List<? extends CompletionStage<? extends T>> source) {
int size = source.size();
if(size == 0) return CompletableFuture.completedFuture(Collections.emptyList());
List<T> result = new ArrayList<>(Collections.nCopies(size, null));
return allOf(source, result, 0, size-1).thenApply(x -> result);
}
private static <T> CompletionStage<Void> allOf(
List<? extends CompletionStage<? extends T>> source,
List<T> result, int from, int to) {
if(from < to) {
int mid = (from+to)>>>1;
return allOf(source, result, from, mid)
.thenCombine(allOf(source, result, mid+1, to), (x,y)->x);
}
return source.get(from).thenAccept(t -> result.set(from, t));
}
就是这样。
您可以使用此解决方案将问题代码的逻辑实现为
public static CompletionStage<List<String>> translateAllAsync(List<String> input) {
return allOf(input.stream().map(s -> translate(s)))
.thenApply(list -> list.stream()
.map(s -> s.toUpperCase())
.collect(Collectors.toList()));
}
尽管使用
会更自然public static CompletionStage<List<String>> translateAllAsync(List<String> input) {
return allOf(input.stream().map(s -> translate(s).thenApply(String::toUpperCase)));
}
请注意,此解决方案保持顺序,因此无需对测试用例中的结果进行排序:
@Test
public void testTranslate() throws Exception {
List<String> list = translateAllAsync(Arrays.asList("foo", "bar")).toCompletableFuture().get();
assertEquals(list, Arrays.asList("TRANSLATED FOO", "TRANSLATED BAR"));
}