如何等待封闭的异步块完成

How to wait for enclosed asynchronous block to finish

我遇到了 stream().forEach 问题,它没有在方法 returns 之前及时完成,方法如下:

我的实体:

@Getter
@Setter
@AllArgsConstructor
@NoArgsConstructor
@ToString
public class Foo {
    private String a;
    private int b;
    private int c;
    private String d;
    private String e;
}

我有一个调用外部服务的方法,它获取 Foo 的列表,然后对于该列表的每个成员,它调用另外两个外部服务来填充 de 字段:

public List<Foo> getOrdresListe() {
    Foo[] fooArray = externalServiceOne.getFooList();
    Arrays.stream(fooArray).forEach((f) -> {
        CompletableFuture.supplyAsync(AsyncUtils.supplierWithCustomDependencies(() -> {
            Dob dob = externalServiceTwo.getDeeEntity(f.getA());
            f.setD(dob.getD());
            Efo efo = externalServiceThree.getEeeEntity(f.getA());
            f.setE(efo.getE());
            return f;
        }));
    });
    List<Foo> fooList = Arrays.asList(fooArray);
    return fooList; // when this statement is reached d and e fields are null.
}

由于一些性能问题(和一些最佳实践),我在调用服务时与自定义供应商异步调用 externalServiceTwo.getDeeEntityexternalServiceThree.getEeeEntity 以启动一些依赖项。但主要问题是返回 fooListde 字段为空。

我的问题是如何在返回之前等待所有异步执行完成 fooList

为什么不创建所有 CompletableFuture 作为中间操作并等待所有异步执行在终端操作中完成。

Arrays.stream(fooArray).map((f) -> CompletableFuture.supplyAsync(AsyncUtils.supplierWithCustomDependencies(() -> {
                Dob dob = externalServiceTwo.getDeeEntity(f.getA());
                f.setD(dob.getD());
                Efo efo = externalServiceThree.getEeeEntity(f.getA());
                f.setE(efo.getE());
                return f;
            })
    )).forEach(fooCompletableFuture -> {
        try {
            fooCompletableFuture.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    });

您可以只记住异步任务并手动等待它们完成:

public List<Foo> getOrdresListe() {
    Foo[] fooArray = externalServiceOne.getFooList();

    final List<CompletableFuture<Foo>> futures = new ArrayList<>(); // remember each

    Arrays.stream(fooArray).forEach((f) -> {
        futures.add(CompletableFuture.supplyAsync(
                AsyncUtils.supplierWithCustomDependencies(() -> {
            Dob dob = externalServiceTwo.getDeeEntity(f.getA());
            f.setD(dob.getD());
            Efo efo = externalServiceThree.getEeeEntity(f.getA());
            f.setE(efo.getE());
            return f;
        })));
    });

    for (CompletableFuture<Foo> future : futures) {
        future.join(); // wait for each
    }

    List<Foo> fooList = Arrays.asList(fooArray);
    return fooList;
}

可能会尝试这样的事情:发送异步请求并创建 Foo 类型的可完成期货列表,然后按照@boobalan 的建议使用 join 作为终端操作;您以这种方式获得结果,您可以等待所有异步执行完成后再返回 fooList。


List<foo> fooData = externalServiceOne.getFooList();

//Sends the request asynchronously and returns the Completable Future of type Foo which can be later used to get the results using `get` or `join`
private List<CompletableFuture<Foo>> sendRequest() {
    return fooData.stream().map(this::computeAsync).collect(Collectors.toList());
}

private CompletableFuture<Foo> computeAsync(Foo f) {
    return CompletableFuture.supplyAsync(() -> 
CompletableFuture.supplyAsync(AsyncUtils.supplierWithCustomDependencies(() -> {
                Dob dob = externalServiceTwo.getDeeEntity(f.getA());
                f.setD(dob.getD());
                Efo efo = externalServiceThree.getEeeEntity(f.getA());
                f.setE(efo.getE());
                return f 
           });

private List<Foo> processResponse() {
   List<CompletableFuture<Foo>> futureResult = sendRequest();

   List<Foo> fooList = futureResult.stream()
            .map(CompletableFuture::join)
            .filter(Objects::nonNull)
            .collect(Collectors.toList());
     return fooList;
    }

}