如何等待封闭的异步块完成
How to wait for enclosed asynchronous block to finish
我遇到了 stream().forEach
问题,它没有在方法 returns 之前及时完成,方法如下:
我的实体:
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Foo {
private String a;
private int b;
private int c;
private String d;
private String e;
}
我有一个调用外部服务的方法,它获取 Foo
的列表,然后对于该列表的每个成员,它调用另外两个外部服务来填充 d
和 e
字段:
public List<Foo> getOrdresListe() {
Foo[] fooArray = externalServiceOne.getFooList();
Arrays.stream(fooArray).forEach((f) -> {
CompletableFuture.supplyAsync(AsyncUtils.supplierWithCustomDependencies(() -> {
Dob dob = externalServiceTwo.getDeeEntity(f.getA());
f.setD(dob.getD());
Efo efo = externalServiceThree.getEeeEntity(f.getA());
f.setE(efo.getE());
return f;
}));
});
List<Foo> fooList = Arrays.asList(fooArray);
return fooList; // when this statement is reached d and e fields are null.
}
由于一些性能问题(和一些最佳实践),我在调用服务时与自定义供应商异步调用 externalServiceTwo.getDeeEntity
和 externalServiceThree.getEeeEntity
以启动一些依赖项。但主要问题是返回 fooList
时 d
和 e
字段为空。
我的问题是如何在返回之前等待所有异步执行完成 fooList
?
为什么不创建所有 CompletableFuture
作为中间操作并等待所有异步执行在终端操作中完成。
Arrays.stream(fooArray).map((f) -> CompletableFuture.supplyAsync(AsyncUtils.supplierWithCustomDependencies(() -> {
Dob dob = externalServiceTwo.getDeeEntity(f.getA());
f.setD(dob.getD());
Efo efo = externalServiceThree.getEeeEntity(f.getA());
f.setE(efo.getE());
return f;
})
)).forEach(fooCompletableFuture -> {
try {
fooCompletableFuture.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
您可以只记住异步任务并手动等待它们完成:
public List<Foo> getOrdresListe() {
Foo[] fooArray = externalServiceOne.getFooList();
final List<CompletableFuture<Foo>> futures = new ArrayList<>(); // remember each
Arrays.stream(fooArray).forEach((f) -> {
futures.add(CompletableFuture.supplyAsync(
AsyncUtils.supplierWithCustomDependencies(() -> {
Dob dob = externalServiceTwo.getDeeEntity(f.getA());
f.setD(dob.getD());
Efo efo = externalServiceThree.getEeeEntity(f.getA());
f.setE(efo.getE());
return f;
})));
});
for (CompletableFuture<Foo> future : futures) {
future.join(); // wait for each
}
List<Foo> fooList = Arrays.asList(fooArray);
return fooList;
}
可能会尝试这样的事情:发送异步请求并创建 Foo
类型的可完成期货列表,然后按照@boobalan 的建议使用 join
作为终端操作;您以这种方式获得结果,您可以等待所有异步执行完成后再返回 fooList。
List<foo> fooData = externalServiceOne.getFooList();
//Sends the request asynchronously and returns the Completable Future of type Foo which can be later used to get the results using `get` or `join`
private List<CompletableFuture<Foo>> sendRequest() {
return fooData.stream().map(this::computeAsync).collect(Collectors.toList());
}
private CompletableFuture<Foo> computeAsync(Foo f) {
return CompletableFuture.supplyAsync(() ->
CompletableFuture.supplyAsync(AsyncUtils.supplierWithCustomDependencies(() -> {
Dob dob = externalServiceTwo.getDeeEntity(f.getA());
f.setD(dob.getD());
Efo efo = externalServiceThree.getEeeEntity(f.getA());
f.setE(efo.getE());
return f
});
private List<Foo> processResponse() {
List<CompletableFuture<Foo>> futureResult = sendRequest();
List<Foo> fooList = futureResult.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
return fooList;
}
}
我遇到了 stream().forEach
问题,它没有在方法 returns 之前及时完成,方法如下:
我的实体:
@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Foo {
private String a;
private int b;
private int c;
private String d;
private String e;
}
我有一个调用外部服务的方法,它获取 Foo
的列表,然后对于该列表的每个成员,它调用另外两个外部服务来填充 d
和 e
字段:
public List<Foo> getOrdresListe() {
Foo[] fooArray = externalServiceOne.getFooList();
Arrays.stream(fooArray).forEach((f) -> {
CompletableFuture.supplyAsync(AsyncUtils.supplierWithCustomDependencies(() -> {
Dob dob = externalServiceTwo.getDeeEntity(f.getA());
f.setD(dob.getD());
Efo efo = externalServiceThree.getEeeEntity(f.getA());
f.setE(efo.getE());
return f;
}));
});
List<Foo> fooList = Arrays.asList(fooArray);
return fooList; // when this statement is reached d and e fields are null.
}
由于一些性能问题(和一些最佳实践),我在调用服务时与自定义供应商异步调用 externalServiceTwo.getDeeEntity
和 externalServiceThree.getEeeEntity
以启动一些依赖项。但主要问题是返回 fooList
时 d
和 e
字段为空。
我的问题是如何在返回之前等待所有异步执行完成 fooList
?
为什么不创建所有 CompletableFuture
作为中间操作并等待所有异步执行在终端操作中完成。
Arrays.stream(fooArray).map((f) -> CompletableFuture.supplyAsync(AsyncUtils.supplierWithCustomDependencies(() -> {
Dob dob = externalServiceTwo.getDeeEntity(f.getA());
f.setD(dob.getD());
Efo efo = externalServiceThree.getEeeEntity(f.getA());
f.setE(efo.getE());
return f;
})
)).forEach(fooCompletableFuture -> {
try {
fooCompletableFuture.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
您可以只记住异步任务并手动等待它们完成:
public List<Foo> getOrdresListe() {
Foo[] fooArray = externalServiceOne.getFooList();
final List<CompletableFuture<Foo>> futures = new ArrayList<>(); // remember each
Arrays.stream(fooArray).forEach((f) -> {
futures.add(CompletableFuture.supplyAsync(
AsyncUtils.supplierWithCustomDependencies(() -> {
Dob dob = externalServiceTwo.getDeeEntity(f.getA());
f.setD(dob.getD());
Efo efo = externalServiceThree.getEeeEntity(f.getA());
f.setE(efo.getE());
return f;
})));
});
for (CompletableFuture<Foo> future : futures) {
future.join(); // wait for each
}
List<Foo> fooList = Arrays.asList(fooArray);
return fooList;
}
可能会尝试这样的事情:发送异步请求并创建 Foo
类型的可完成期货列表,然后按照@boobalan 的建议使用 join
作为终端操作;您以这种方式获得结果,您可以等待所有异步执行完成后再返回 fooList。
List<foo> fooData = externalServiceOne.getFooList();
//Sends the request asynchronously and returns the Completable Future of type Foo which can be later used to get the results using `get` or `join`
private List<CompletableFuture<Foo>> sendRequest() {
return fooData.stream().map(this::computeAsync).collect(Collectors.toList());
}
private CompletableFuture<Foo> computeAsync(Foo f) {
return CompletableFuture.supplyAsync(() ->
CompletableFuture.supplyAsync(AsyncUtils.supplierWithCustomDependencies(() -> {
Dob dob = externalServiceTwo.getDeeEntity(f.getA());
f.setD(dob.getD());
Efo efo = externalServiceThree.getEeeEntity(f.getA());
f.setE(efo.getE());
return f
});
private List<Foo> processResponse() {
List<CompletableFuture<Foo>> futureResult = sendRequest();
List<Foo> fooList = futureResult.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.collect(Collectors.toList());
return fooList;
}
}