Java 8 个可完成的期货所有不同的数据类型
Java 8 Completable Futures allOf different data types
我有 3 个 CompletableFutures,所有 3 个都返回不同的数据类型。
我希望创建一个结果对象,它是所有 3 个期货返回结果的组合。
所以我当前的工作代码如下所示:
public ClassD getResultClassD() {
ClassD resultClass = new ClassD();
CompletableFuture<ClassA> classAFuture = CompletableFuture.supplyAsync(() -> service.getClassA() );
CompletableFuture<ClassB> classBFuture = CompletableFuture.supplyAsync(() -> service.getClassB() );
CompletableFuture<ClassC> classCFuture = CompletableFuture.supplyAsync(() -> service.getClassC() );
CompletableFuture.allOf(classAFuture, classBFuture, classCFuture)
.thenAcceptAsync(it -> {
ClassA classA = classAFuture.join();
if (classA != null) {
resultClass.setClassA(classA);
}
ClassB classB = classBFuture.join();
if (classB != null) {
resultClass.setClassB(classB);
}
ClassC classC = classCFuture.join();
if (classC != null) {
resultClass.setClassC(classC);
}
});
return resultClass;
}
我的问题是:
我在这里的假设是,因为我正在使用 allOf
和 thenAcceptAsync
,所以这个调用将是非阻塞的。我的理解对吗?
这是处理返回不同结果类型的多个期货的正确方法吗?
在thenAcceptAsync
中构造ClassD
对象是否正确?
- 在 thenAcceptAsync lambda 中使用
join
或 getNow
方法合适吗?
您的尝试方向正确,但不正确。您的方法 getResultClassD()
return 是一个已经实例化的 ClassD
类型的对象,任意线程将在其上调用修改方法,而 getResultClassD()
的调用者不会注意到。这可能会导致竞争条件,如果修改方法本身不是线程安全的,此外,调用者永远不会知道 ClassD
实例何时真正准备好使用。
正确的解决方案是:
public CompletableFuture<ClassD> getResultClassD() {
CompletableFuture<ClassA> classAFuture
= CompletableFuture.supplyAsync(() -> service.getClassA() );
CompletableFuture<ClassB> classBFuture
= CompletableFuture.supplyAsync(() -> service.getClassB() );
CompletableFuture<ClassC> classCFuture
= CompletableFuture.supplyAsync(() -> service.getClassC() );
return CompletableFuture.allOf(classAFuture, classBFuture, classCFuture)
.thenApplyAsync(dummy -> {
ClassD resultClass = new ClassD();
ClassA classA = classAFuture.join();
if (classA != null) {
resultClass.setClassA(classA);
}
ClassB classB = classBFuture.join();
if (classB != null) {
resultClass.setClassB(classB);
}
ClassC classC = classCFuture.join();
if (classC != null) {
resultClass.setClassC(classC);
}
return resultClass;
});
}
现在,getResultClassD()
的调用者可以使用 returned CompletableFuture
查询进度状态或链相关操作或使用 join()
检索结果,一旦操作完成。
为了解决其他问题,是的,这个操作是异步的,在 lambda 表达式中使用 join()
是合适的。 join
正是因为 Future.get()
被声明为抛出检查异常而创建的,这使得在这些 lambda 表达式中的使用变得不必要地困难。
请注意,null
测试只有在这些 service.getClassX()
实际上可以 return null
时才有用。如果其中一个服务调用因异常而失败,则整个操作(由 CompletableFuture<ClassD>
表示)将异常完成。
我走的是与@Holger 在他的回答中所做的类似的路线,但是将服务调用包装在一个 Optional 中,这会导致 thenApplyAsync 阶段的代码更清晰
CompletableFuture<Optional<ClassA>> classAFuture
= CompletableFuture.supplyAsync(() -> Optional.ofNullable(service.getClassA())));
CompletableFuture<Optional<ClassB>> classBFuture
= CompletableFuture.supplyAsync(() -> Optional.ofNullable(service.getClassB()));
CompletableFuture<Optional<ClassC>> classCFuture
= CompletableFuture.supplyAsync(() -> Optional.ofNullable(service.getClassC()));
return CompletableFuture.allOf(classAFuture, classBFuture, classCFuture)
.thenApplyAsync(dummy -> {
ClassD resultClass = new ClassD();
classAFuture.join().ifPresent(resultClass::setClassA)
classBFuture.join().ifPresent(resultClass::setClassB)
classCFuture.join().ifPresent(resultClass::setClassC)
return resultClass;
});
如果您不想声明那么多变量,另一种处理此问题的方法是使用 thenCombine 或 thenCombineAsync 将您的 futures 链接在一起。
public CompletableFuture<ClassD> getResultClassD()
{
return CompletableFuture.supplyAsync(ClassD::new)
.thenCombine(CompletableFuture.supplyAsync(service::getClassA), (d, a) -> {
d.setClassA(a);
return d;
})
.thenCombine(CompletableFuture.supplyAsync(service::getClassB), (d, b) -> {
d.setClassB(b);
return d;
})
.thenCombine(CompletableFuture.supplyAsync(service::getClassC), (d, c) -> {
d.setClassC(c);
return d;
});
}
getter 仍将异步触发,结果按顺序执行。它基本上是获得相同结果的另一种语法选项。
我 运行 以前做过类似的事情,并创建了一个简短的演示来展示我是如何解决这个问题的。
与@Holger 类似的概念,只是我使用了一个函数来组合每个单独的未来。
https://github.com/te21wals/CompletableFuturesDemo
本质上:
public class CombindFunctionImpl implement CombindFunction {
public ABCData combind (ClassA a, ClassB b, ClassC c) {
return new ABCData(a, b, c);
}
}
...
public class FutureProvider {
public CompletableFuture<ClassA> retrieveClassA() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new ClassA();
});
}
public CompletableFuture<ClassB> retrieveClassB() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new ClassB();
});
}
public CompletableFuture<ClassC> retrieveClassC() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new ClassC();
});
}
}
......
public static void main (String[] args){
CompletableFuture<ClassA> classAfuture = futureProvider.retrieveClassA();
CompletableFuture<ClassB> classBfuture = futureProvider.retrieveClassB();
CompletableFuture<ClassC> classCfuture = futureProvider.retrieveClassC();
System.out.println("starting completable futures ...");
long startTime = System.nanoTime();
ABCData ABCData = CompletableFuture.allOf(classAfuture, classBfuture, classCfuture)
.thenApplyAsync(ignored ->
combineFunction.combind(
classAfuture.join(),
classBfuture.join(),
classCfuture.join())
).join();
long endTime = System.nanoTime();
long duration = (endTime - startTime);
System.out.println("completable futures are complete...");
System.out.println("duration:\t" + Duration.ofNanos(duration).toString());
System.out.println("result:\t" + ABCData);
}
我有 3 个 CompletableFutures,所有 3 个都返回不同的数据类型。
我希望创建一个结果对象,它是所有 3 个期货返回结果的组合。
所以我当前的工作代码如下所示:
public ClassD getResultClassD() {
ClassD resultClass = new ClassD();
CompletableFuture<ClassA> classAFuture = CompletableFuture.supplyAsync(() -> service.getClassA() );
CompletableFuture<ClassB> classBFuture = CompletableFuture.supplyAsync(() -> service.getClassB() );
CompletableFuture<ClassC> classCFuture = CompletableFuture.supplyAsync(() -> service.getClassC() );
CompletableFuture.allOf(classAFuture, classBFuture, classCFuture)
.thenAcceptAsync(it -> {
ClassA classA = classAFuture.join();
if (classA != null) {
resultClass.setClassA(classA);
}
ClassB classB = classBFuture.join();
if (classB != null) {
resultClass.setClassB(classB);
}
ClassC classC = classCFuture.join();
if (classC != null) {
resultClass.setClassC(classC);
}
});
return resultClass;
}
我的问题是:
我在这里的假设是,因为我正在使用
allOf
和thenAcceptAsync
,所以这个调用将是非阻塞的。我的理解对吗?这是处理返回不同结果类型的多个期货的正确方法吗?
在
thenAcceptAsync
中构造ClassD
对象是否正确?- 在 thenAcceptAsync lambda 中使用
join
或getNow
方法合适吗?
您的尝试方向正确,但不正确。您的方法 getResultClassD()
return 是一个已经实例化的 ClassD
类型的对象,任意线程将在其上调用修改方法,而 getResultClassD()
的调用者不会注意到。这可能会导致竞争条件,如果修改方法本身不是线程安全的,此外,调用者永远不会知道 ClassD
实例何时真正准备好使用。
正确的解决方案是:
public CompletableFuture<ClassD> getResultClassD() {
CompletableFuture<ClassA> classAFuture
= CompletableFuture.supplyAsync(() -> service.getClassA() );
CompletableFuture<ClassB> classBFuture
= CompletableFuture.supplyAsync(() -> service.getClassB() );
CompletableFuture<ClassC> classCFuture
= CompletableFuture.supplyAsync(() -> service.getClassC() );
return CompletableFuture.allOf(classAFuture, classBFuture, classCFuture)
.thenApplyAsync(dummy -> {
ClassD resultClass = new ClassD();
ClassA classA = classAFuture.join();
if (classA != null) {
resultClass.setClassA(classA);
}
ClassB classB = classBFuture.join();
if (classB != null) {
resultClass.setClassB(classB);
}
ClassC classC = classCFuture.join();
if (classC != null) {
resultClass.setClassC(classC);
}
return resultClass;
});
}
现在,getResultClassD()
的调用者可以使用 returned CompletableFuture
查询进度状态或链相关操作或使用 join()
检索结果,一旦操作完成。
为了解决其他问题,是的,这个操作是异步的,在 lambda 表达式中使用 join()
是合适的。 join
正是因为 Future.get()
被声明为抛出检查异常而创建的,这使得在这些 lambda 表达式中的使用变得不必要地困难。
请注意,null
测试只有在这些 service.getClassX()
实际上可以 return null
时才有用。如果其中一个服务调用因异常而失败,则整个操作(由 CompletableFuture<ClassD>
表示)将异常完成。
我走的是与@Holger 在他的回答中所做的类似的路线,但是将服务调用包装在一个 Optional 中,这会导致 thenApplyAsync 阶段的代码更清晰
CompletableFuture<Optional<ClassA>> classAFuture
= CompletableFuture.supplyAsync(() -> Optional.ofNullable(service.getClassA())));
CompletableFuture<Optional<ClassB>> classBFuture
= CompletableFuture.supplyAsync(() -> Optional.ofNullable(service.getClassB()));
CompletableFuture<Optional<ClassC>> classCFuture
= CompletableFuture.supplyAsync(() -> Optional.ofNullable(service.getClassC()));
return CompletableFuture.allOf(classAFuture, classBFuture, classCFuture)
.thenApplyAsync(dummy -> {
ClassD resultClass = new ClassD();
classAFuture.join().ifPresent(resultClass::setClassA)
classBFuture.join().ifPresent(resultClass::setClassB)
classCFuture.join().ifPresent(resultClass::setClassC)
return resultClass;
});
如果您不想声明那么多变量,另一种处理此问题的方法是使用 thenCombine 或 thenCombineAsync 将您的 futures 链接在一起。
public CompletableFuture<ClassD> getResultClassD()
{
return CompletableFuture.supplyAsync(ClassD::new)
.thenCombine(CompletableFuture.supplyAsync(service::getClassA), (d, a) -> {
d.setClassA(a);
return d;
})
.thenCombine(CompletableFuture.supplyAsync(service::getClassB), (d, b) -> {
d.setClassB(b);
return d;
})
.thenCombine(CompletableFuture.supplyAsync(service::getClassC), (d, c) -> {
d.setClassC(c);
return d;
});
}
getter 仍将异步触发,结果按顺序执行。它基本上是获得相同结果的另一种语法选项。
我 运行 以前做过类似的事情,并创建了一个简短的演示来展示我是如何解决这个问题的。
与@Holger 类似的概念,只是我使用了一个函数来组合每个单独的未来。
https://github.com/te21wals/CompletableFuturesDemo
本质上:
public class CombindFunctionImpl implement CombindFunction {
public ABCData combind (ClassA a, ClassB b, ClassC c) {
return new ABCData(a, b, c);
}
}
...
public class FutureProvider {
public CompletableFuture<ClassA> retrieveClassA() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new ClassA();
});
}
public CompletableFuture<ClassB> retrieveClassB() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new ClassB();
});
}
public CompletableFuture<ClassC> retrieveClassC() {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new ClassC();
});
}
}
......
public static void main (String[] args){
CompletableFuture<ClassA> classAfuture = futureProvider.retrieveClassA();
CompletableFuture<ClassB> classBfuture = futureProvider.retrieveClassB();
CompletableFuture<ClassC> classCfuture = futureProvider.retrieveClassC();
System.out.println("starting completable futures ...");
long startTime = System.nanoTime();
ABCData ABCData = CompletableFuture.allOf(classAfuture, classBfuture, classCfuture)
.thenApplyAsync(ignored ->
combineFunction.combind(
classAfuture.join(),
classBfuture.join(),
classCfuture.join())
).join();
long endTime = System.nanoTime();
long duration = (endTime - startTime);
System.out.println("completable futures are complete...");
System.out.println("duration:\t" + Duration.ofNanos(duration).toString());
System.out.println("result:\t" + ABCData);
}