Vertx CompositeFuture:完成所有期货
Vertx CompositeFuture: on completion of all Futures
在 Vert.x 网络服务器中,我有一组 Futures,每个 Futures 都可能失败或成功并保存一个结果。我对每一个 Future 的结果(可能还有结果)感兴趣,这意味着我需要处理每个 Future 的结果。
我认为 Vert.x 的 CompositeFuture
是可行的方法,这是我的代码片段:
List<Future> futures = dataProviders.stream()
.filter(dp -> dp.isActive(requester))
.map(DataProvider::getData)
.collect(Collectors.toList());
CompositeFuture.all(futures)
.onComplete(ar -> {
if(ar.failed()) {
routingContext.response()
.end(ar.cause());
return;
}
CompositeFuture cf = ar.result();
JsonArray data = new JsonArray();
for(int i = 0; i < cf.size(); i++) {
if(cf.failed(i)) {
final JsonObject errorJson = new JsonObject();
errorJson.put("error", cf.cause(i).getMessage());
data.add(errorJson);
} else {
data.add(((Data) cf.resultAt(i)).toJson());
}
}
JsonObject res = new JsonObject()
.put("data", data);
routingContext.response()
.putHeader("Content-Type", "application/json")
.end(res.toString());
});
但是我遇到了以下问题:
- 使用
CompositeFuture.all(futures).onComplete()
,只要 futures
中的任何 Future 失败,我就不会立即获得成功 Future 的结果(因为那时 ar.result()
为空)。
- 使用
CompositeFuture.any(futures).onComplete()
,我会得到所有结果,但 CompositeFuture 在 futures
的所有 Future 完成之前完成。意思是,它不会等待每个 Future 完成,而是在任何 Future 完成后立即完成。 (-> cf.resultAt(i)
returns 空)
- 使用
CompositeFuture.join(futures).onComplete()
,它与 all()
相同:只要任何 Future 失败,ar.result()
就为 null。
什么是 correct/best 等待 Futures 列表完成,同时能够单独处理每个结果的方法?
最简单的方法是您自己处理结果。您可以为您的期货注册一个 onSuccess
处理程序。这样,结果将被放入某种列表中,例如JsonArray
.
List<Future> futures = //...list of futures
JsonArray results = new JsonArray();
futures.forEach(e -> e.onSuccess(h -> results.add(h)));
CompositeFuture.all(futures)
.onComplete(ar -> {
if(ar.failed()) {
// successful elements are present in "results"
routingContext.response().end(results.encode());
return;
}
//... rest of your code
});
您还可以查看 rx-java
库。使用它通常可以更好地实现此类用例。
使用 all
:
时,您可以简单地戳一下原始期货的结果
List<Future> futures = //...list of futures
CompositeFuture.all(futures).onComplete(ar -> {
if(ar.succeeded()){
futures.forEach(fut -> log.info( fut.succeded() +" / " +_fut.result() ));
}
} );
在 Vert.x 网络服务器中,我有一组 Futures,每个 Futures 都可能失败或成功并保存一个结果。我对每一个 Future 的结果(可能还有结果)感兴趣,这意味着我需要处理每个 Future 的结果。
我认为 Vert.x 的 CompositeFuture
是可行的方法,这是我的代码片段:
List<Future> futures = dataProviders.stream()
.filter(dp -> dp.isActive(requester))
.map(DataProvider::getData)
.collect(Collectors.toList());
CompositeFuture.all(futures)
.onComplete(ar -> {
if(ar.failed()) {
routingContext.response()
.end(ar.cause());
return;
}
CompositeFuture cf = ar.result();
JsonArray data = new JsonArray();
for(int i = 0; i < cf.size(); i++) {
if(cf.failed(i)) {
final JsonObject errorJson = new JsonObject();
errorJson.put("error", cf.cause(i).getMessage());
data.add(errorJson);
} else {
data.add(((Data) cf.resultAt(i)).toJson());
}
}
JsonObject res = new JsonObject()
.put("data", data);
routingContext.response()
.putHeader("Content-Type", "application/json")
.end(res.toString());
});
但是我遇到了以下问题:
- 使用
CompositeFuture.all(futures).onComplete()
,只要futures
中的任何 Future 失败,我就不会立即获得成功 Future 的结果(因为那时ar.result()
为空)。 - 使用
CompositeFuture.any(futures).onComplete()
,我会得到所有结果,但 CompositeFuture 在futures
的所有 Future 完成之前完成。意思是,它不会等待每个 Future 完成,而是在任何 Future 完成后立即完成。 (->cf.resultAt(i)
returns 空) - 使用
CompositeFuture.join(futures).onComplete()
,它与all()
相同:只要任何 Future 失败,ar.result()
就为 null。
什么是 correct/best 等待 Futures 列表完成,同时能够单独处理每个结果的方法?
最简单的方法是您自己处理结果。您可以为您的期货注册一个 onSuccess
处理程序。这样,结果将被放入某种列表中,例如JsonArray
.
List<Future> futures = //...list of futures
JsonArray results = new JsonArray();
futures.forEach(e -> e.onSuccess(h -> results.add(h)));
CompositeFuture.all(futures)
.onComplete(ar -> {
if(ar.failed()) {
// successful elements are present in "results"
routingContext.response().end(results.encode());
return;
}
//... rest of your code
});
您还可以查看 rx-java
库。使用它通常可以更好地实现此类用例。
使用 all
:
List<Future> futures = //...list of futures
CompositeFuture.all(futures).onComplete(ar -> {
if(ar.succeeded()){
futures.forEach(fut -> log.info( fut.succeded() +" / " +_fut.result() ));
}
} );