如何使用 RXJava 顺序链接 Vertx CompositeFuture?

How to sequentially chain Vertx CompositeFuture using RXJava?

我需要按顺序链接以 RxJava 样式的 Vertx CompositeFutures 来依赖 CompositeFuture,避免回调地狱。


每个 CompositeFuture.any/all 做一些 return 未来的异步操作,比方说 myList1、myList2、myList3,但我 必须等待 t CompositeFuture.any(myList1) 完成 return 成功 然后再执行 CompositeFuture.any(myList2),从 myList2 到 myList3 也是如此。当然,CompositeFuture 本身会异步执行作业,但只是针对它的一组操作,因为下一组必须在第一组顺利完成后立即完成。


    public static void myFunc(Vertx vertx, Handler<AsyncResult<CompositeFuture>> asyncResultHandler) {

        CompositeFuture.any(myList1 < Future >)
                .onComplete(ar1 -> {
                    if (!ar1.succeeded()) {
                    } else {
                        CompositeFuture.any(myList2 < Future >)
                                .onComplete(ar2 -> {
                                            if (!ar2.succeeded()) {
                                            } else {
                                                CompositeFuture.all(myList3 < Future >)
                                                        .onComplete(ar3 -> {


    public static void myFunc(Vertx vertx, Handler<AsyncResult<CompositeFuture>> asyncResultHandler) {
                .just(CompositeFuture.any(myList1 < Future >))
                .flatMap(previousFuture -> rxComposeAny(previousFuture, myList2 < Future >))
                .flatMap(previousFuture -> rxComposeAll(previousFuture, myList3 < Future >))

    public static Single<CompositeFuture> rxComposeAny(CompositeFuture previousResult, List<Future> myList) {
        if (previousResult.failed()) return Single.just(previousResult); // See explanation bellow

        CompositeFuture compositeFuture = CompositeFuture.any(myList);
        return Single.just(compositeFuture);

    public static Single<CompositeFuture> rxComposeAll(CompositeFuture previousResult, List<Future> myList) {
        if (previousResult.failed()) return Single.just(previousResult);

        CompositeFuture compositeFuture = CompositeFuture.any(myList);
        return Single.just(compositeFuture);

更加简洁明了。 但是,我没有成功将之前的失败传递给 asyncResultHandler。

我的想法是这样的:flatMap通过了之前的CompositeFuture结果,我想检查它是否失败了。下一个 rxComposeAny/All 首先检查前一个是否失败,如果是,则只 return 失败的 CompositeFuture 等等,直到它命中订阅者中的处理程序。如果前一个通过了测试,我可以继续传递当前结果,直到最后一个成功的 CompositeFuture 命中处理程序。


        if (previousResult.failed()) return Single.just(previousResult); // See explanation bellow

不起作用,所有 CompositeFutures 都已处理,但未测试是否成功完成,只有最后一个最终被传递给 asyncResultHandler,它将测试整体失败(但在我的代码中,它最终只检查了最后一个)

我正在使用 Vertx 3.9.0 和 RxJava 2 Vertx API。

披露:我在 Vertx 方面有经验,但我在 RxJava 方面是全新的。所以我很感激任何答案,从技术解决方案到概念解释。


编辑(在@homerman 的出色回应之后): 我需要具有与顺序依赖的 CompositeFutures 的“回调地狱风格”完全相同的行为,即 必须在 onComplete 之后调用 next 并测试是否完成失败或成功。复杂性来自以下事实:

  1. 我必须使用 vertx CompositeAll/Any 方法,而不是 zip。 Zip 提供类似于 CompositeAll 的行为,但不是 CompositeAny。
  2. CompositeAll/Any return 完成的未来就在 onComplete 方法中。如果我之前像上面显示的那样检查它,因为它是异步的,我会得到未解决的期货。
  3. CompositeAll/Any 如果失败不会抛出错误,但在 onComplete 内部失败,所以我不能使用 rxJava 的 onError。

例如,我在 rxComposite 函数中尝试了以下更改:

    public static Single<CompositeFuture> rxLoadVerticlesAny(CompositeFuture previousResult, Vertx vertx, String deploymentName,
                                                             List<Class<? extends Verticle>> verticles, JsonObject config) {
        previousResult.onComplete(event -> {
                    if (event.failed()) {
                        return Single.just(previousResult);

                    } else {
                        CompositeFuture compositeFuture = CompositeFuture.any(VertxDeployHelper.deploy(vertx, verticles, config));
                        return Single.just(compositeFuture);

但它自然不会编译,因为 lambda 是无效的。如何在 Vertx 中重现与 rxJava 完全相同的行为?


Each CompositeFuture.any/all do some async operations that return futures, lets say myList1, myList2, myList3, but I must wait for CompositeFuture.any(myList1) to complete and return success before doing CompositeFuture.any(myList2), and the same from myList2 to myList3.

您提供了 CompositeFuture.any()CompositeFuture.all() 作为参考点,但您描述的行为与 all() 一致,也就是说生成的组合只会产生成功如果 所有 它的成员都这样做。

出于我的回答目的,我假设 all() 是您期望的行为。

在 RxJava 中,由异常触发的意外错误将导致流终止,底层异常通过 onError() 回调传递给观察者。


final Single<String> a1 = Single.just("Batch-A-Operation-1");
final Single<String> a2 = Single.just("Batch-A-Operation-2");
final Single<String> a3 = Single.just("Batch-A-Operation-3");

final Single<String> b1 = Single.just("Batch-B-Operation-1");
final Single<String> b2 = Single.just("Batch-B-Operation-2");
final Single<String> b3 = Single.just("Batch-B-Operation-3");

final Single<String> c1 = Single.just("Batch-C-Operation-1");
final Single<String> c2 = Single.just("Batch-C-Operation-2");
final Single<String> c3 = Single.just("Batch-C-Operation-3");



    .zip(a1, a2, a3, (s, s2, s3) -> {
      return "A's completed successfully";
    .flatMap((Function<String, SingleSource<String>>) s -> {
      throw new RuntimeException("B's failed");
    .flatMap((Function<String, SingleSource<String>>) s -> {
      return Single.zip(c1, c2, c3, (one, two, three) -> "C's completed successfully");
        s -> System.out.println("## onSuccess(" + s + ")"),
        t -> System.out.println("## onError(" + t.getMessage() + ")")

(如果您不熟悉,可以使用 zip() 运算符组合作为输入提供的所有源的结果以发出 another/new 源)。


  • 流在 B 的执行期间终止
  • 异常被报告给观察者(即onError()处理程序被触发)
  • C 从未被处理


class State {
  final String value;
  final Throwable error;

  State(String value, Throwable error) {
    this.value = value;
    this.error = error;


    .zip(a1, a2, a3, (s, s2, s3) -> {
      try {
        // Execute the A's here...
        return new State("A's completed successfully", null);

      } catch(Throwable t) {
        return new State(null, t);
    .flatMap((Function<State, SingleSource<State>>) s -> {
      if(s.error != null) {
        // If an error occurred upstream, skip this batch...
        return Single.just(s);

      } else {
        try {
          // ...otherwise, execute the B's
          return Single.just(new State("B's completed successfully", null));
        } catch(Throwable t) {
          return Single.just(new State(null, t));
    .flatMap((Function<State, SingleSource<State>>) s -> {
      if(s.error != null) {
        // If an error occurred upstream, skip this batch...
        return Single.just(s);

      } else {
        try {
          // ...otherwise, execute the C's
          return Single.just(new State("C's completed successfully", null));

        } catch(Throwable t) {
          return Single.just(new State(null, t));
        s -> {
          if(s.error != null) {
            System.out.println("## onSuccess with error: " + s.error.getMessage());
          } else {
            System.out.println("## onSuccess without error: " + s.value);
        t -> System.out.println("## onError(" + t.getMessage() + ")")

在对 Vertx 源代码进行一些研究之后,我发现了一个 public 方法,CompositeFuture 的 rx 版本使用该方法将 'traditional' CompositeFuture 转换为其 rx 版本。方法是io.vertx.reactivex.core.CompositeFuture.newInstance。通过这种解决方法,我可以使用我的传统方法,然后将其转换为在 rx 链中使用。这就是我想要的,因为改变现有的传统方法是有问题的。


        .flatMap(config -> {
            return rxComposeAny(vertx, config)
                    .flatMap(r -> rxComposeAny(vertx, config))
                    .flatMap(r -> rxComposeAll(vertx, config));
                compositeFuture -> {
                    compositeFuture.onSuccess(event -> startPromise.complete());
                error -> startPromise.fail(error));

public static Single<JsonObject> rxGetConfig(Vertx vertx) {
    ConfigRetrieverOptions enrichConfigRetrieverOptions = getEnrichConfigRetrieverOptions();
    // the reason we create new vertx is just to get an instance that is rx
    // so this ConfigRetriever is from io.vertx.reactivex.config, instead of normal io.vertx.config
    ConfigRetriever configRetriever = ConfigRetriever.create(io.vertx.reactivex.core.Vertx.newInstance(vertx), enrichConfigRetrieverOptions);

    return configRetriever.rxGetConfig();

public static Single<io.vertx.reactivex.core.CompositeFuture> rxComposeAny(Vertx vertx, JsonObject config) {

    // instead of adapted all the parameters of myMethodsThatReturnsFutures to be rx compliant, 
    // we create it 'normally' and the converts bellow to rx CompositeFuture
    CompositeFuture compositeFuture = CompositeFuture.any(myMethodsThatReturnsFutures(config));

    return io.vertx.reactivex.core.CompositeFuture