Vertx CompositeFuture:完成所有期货

Vertx CompositeFuture: on completion of all Futures

在 Vert.x 网络服务器中,我有一组 Futures,每个 Futures 都可能失败或成功并保存一个结果。我对每一个 Future 的结果(可能还有结果)感兴趣,这意味着我需要处理每个 Future 的结果。

我认为 Vert.x 的 CompositeFuture 是可行的方法,这是我的代码片段:

List<Future> futures = dataProviders.stream()
    .filter(dp -> dp.isActive(requester))
    .map(DataProvider::getData)
    .collect(Collectors.toList());

CompositeFuture.all(futures)
        .onComplete(ar -> {
            if(ar.failed()) {
                routingContext.response()
                    .end(ar.cause());
                return;
            }
            
            CompositeFuture cf = ar.result();
            JsonArray data = new JsonArray();
            for(int i = 0; i < cf.size(); i++) {
                if(cf.failed(i)) {
                    final JsonObject errorJson = new JsonObject();
                    errorJson.put("error", cf.cause(i).getMessage());
                    data.add(errorJson);
                } else {
                    data.add(((Data) cf.resultAt(i)).toJson());
                }
            }

            JsonObject res = new JsonObject()
                .put("data", data);

            routingContext.response()
                    .putHeader("Content-Type", "application/json")
                    .end(res.toString());
        });

但是我遇到了以下问题:

什么是 correct/best 等待 Futures 列表完成,同时能够单独处理每个结果的方法?

最简单的方法是您自己处理结果。您可以为您的期货注册一个 onSuccess 处理程序。这样,结果将被放入某种列表中,例如JsonArray.

List<Future> futures = //...list of futures

JsonArray results = new JsonArray();
futures.forEach(e -> e.onSuccess(h -> results.add(h)));

CompositeFuture.all(futures)
    .onComplete(ar -> {
        if(ar.failed()) {
            // successful elements are present in "results"
            routingContext.response().end(results.encode());
            return;
        }
        //... rest of your code
     });

您还可以查看 rx-java 库。使用它通常可以更好地实现此类用例。

使用 all:

时,您可以简单地戳一下原始期货的结果
List<Future> futures = //...list of futures

CompositeFuture.all(futures).onComplete(ar -> {
  if(ar.succeeded()){
    futures.forEach(fut -> log.info( fut.succeded() +" / " +_fut.result() ));
  }
} );