如何使用 RXJava 顺序链接 Vertx CompositeFuture?
How to sequentially chain Vertx CompositeFuture using RXJava?
我需要按顺序链接以 RxJava 样式的 Vertx CompositeFutures 来依赖 CompositeFuture,避免回调地狱。
用例:
每个 CompositeFuture.any/all 做一些 return 未来的异步操作,比方说 myList1、myList2、myList3,但我 必须等待 t CompositeFuture.any(myList1) 完成 return 成功 然后再执行 CompositeFuture.any(myList2),从 myList2 到 myList3 也是如此。当然,CompositeFuture 本身会异步执行作业,但只是针对它的一组操作,因为下一组必须在第一组顺利完成后立即完成。
以“回调地狱风格”进行操作将是:
public static void myFunc(Vertx vertx, Handler<AsyncResult<CompositeFuture>> asyncResultHandler) {
CompositeFuture.any(myList1 < Future >)
.onComplete(ar1 -> {
if (!ar1.succeeded()) {
asyncResultHandler.handle(ar1);
} else {
CompositeFuture.any(myList2 < Future >)
.onComplete(ar2 -> {
if (!ar2.succeeded()) {
asyncResultHandler.handle(ar2);
} else {
CompositeFuture.all(myList3 < Future >)
.onComplete(ar3 -> {
asyncResultHandler.handle(ar3);
.... <ARROW OF CLOSING BRACKETS> ...
}
现在我尝试了这样的事情:
public static void myFunc(Vertx vertx, Handler<AsyncResult<CompositeFuture>> asyncResultHandler) {
Single
.just(CompositeFuture.any(myList1 < Future >))
.flatMap(previousFuture -> rxComposeAny(previousFuture, myList2 < Future >))
.flatMap(previousFuture -> rxComposeAll(previousFuture, myList3 < Future >))
.subscribe(SingleHelper.toObserver(asyncResultHandler));
}
public static Single<CompositeFuture> rxComposeAny(CompositeFuture previousResult, List<Future> myList) {
if (previousResult.failed()) return Single.just(previousResult); // See explanation bellow
CompositeFuture compositeFuture = CompositeFuture.any(myList);
return Single.just(compositeFuture);
}
public static Single<CompositeFuture> rxComposeAll(CompositeFuture previousResult, List<Future> myList) {
if (previousResult.failed()) return Single.just(previousResult);
CompositeFuture compositeFuture = CompositeFuture.any(myList);
return Single.just(compositeFuture);
}
}
更加简洁明了。 但是,我没有成功将之前的失败传递给 asyncResultHandler。
我的想法是这样的:flatMap通过了之前的CompositeFuture结果,我想检查它是否失败了。下一个 rxComposeAny/All 首先检查前一个是否失败,如果是,则只 return 失败的 CompositeFuture 等等,直到它命中订阅者中的处理程序。如果前一个通过了测试,我可以继续传递当前结果,直到最后一个成功的 CompositeFuture 命中处理程序。
问题是检查
if (previousResult.failed()) return Single.just(previousResult); // See explanation bellow
不起作用,所有 CompositeFutures 都已处理,但未测试是否成功完成,只有最后一个最终被传递给 asyncResultHandler,它将测试整体失败(但在我的代码中,它最终只检查了最后一个)
我正在使用 Vertx 3.9.0 和 RxJava 2 Vertx API。
披露:我在 Vertx 方面有经验,但我在 RxJava 方面是全新的。所以我很感激任何答案,从技术解决方案到概念解释。
谢谢。
编辑(在@homerman 的出色回应之后):
我需要具有与顺序依赖的 CompositeFutures 的“回调地狱风格”完全相同的行为,即 必须在 onComplete 之后调用 next 并测试是否完成失败或成功。复杂性来自以下事实:
- 我必须使用 vertx CompositeAll/Any 方法,而不是 zip。 Zip 提供类似于 CompositeAll 的行为,但不是 CompositeAny。
- CompositeAll/Any return 完成的未来就在 onComplete 方法中。如果我之前像上面显示的那样检查它,因为它是异步的,我会得到未解决的期货。
- CompositeAll/Any 如果失败不会抛出错误,但在 onComplete 内部失败,所以我不能使用 rxJava 的 onError。
例如,我在 rxComposite 函数中尝试了以下更改:
public static Single<CompositeFuture> rxLoadVerticlesAny(CompositeFuture previousResult, Vertx vertx, String deploymentName,
List<Class<? extends Verticle>> verticles, JsonObject config) {
previousResult.onComplete(event -> {
if (event.failed()) {
return Single.just(previousResult);
} else {
CompositeFuture compositeFuture = CompositeFuture.any(VertxDeployHelper.deploy(vertx, verticles, config));
return Single.just(compositeFuture);
}
}
);
}
但它自然不会编译,因为 lambda 是无效的。如何在 Vertx 中重现与 rxJava 完全相同的行为?
澄清一些事情...
Each CompositeFuture.any/all do some async operations that return
futures, lets say myList1, myList2, myList3, but I must wait for
CompositeFuture.any(myList1) to complete and return success before
doing CompositeFuture.any(myList2), and the same from myList2 to
myList3.
您提供了 CompositeFuture.any()
和 CompositeFuture.all()
作为参考点,但您描述的行为与 all()
一致,也就是说生成的组合只会产生成功如果 所有 它的成员都这样做。
出于我的回答目的,我假设 all()
是您期望的行为。
在 RxJava 中,由异常触发的意外错误将导致流终止,底层异常通过 onError()
回调传递给观察者。
作为一个小演示,假设以下设置:
final Single<String> a1 = Single.just("Batch-A-Operation-1");
final Single<String> a2 = Single.just("Batch-A-Operation-2");
final Single<String> a3 = Single.just("Batch-A-Operation-3");
final Single<String> b1 = Single.just("Batch-B-Operation-1");
final Single<String> b2 = Single.just("Batch-B-Operation-2");
final Single<String> b3 = Single.just("Batch-B-Operation-3");
final Single<String> c1 = Single.just("Batch-C-Operation-1");
final Single<String> c2 = Single.just("Batch-C-Operation-2");
final Single<String> c3 = Single.just("Batch-C-Operation-3");
每个Single
代表一个要执行的离散操作,它们根据一些逻辑分组进行逻辑命名(即它们应该一起执行)。例如,“Batch-A”对应您的“myList1”,“Batch-B”对应您的“myList2”,...
假定以下流:
Single
.zip(a1, a2, a3, (s, s2, s3) -> {
return "A's completed successfully";
})
.flatMap((Function<String, SingleSource<String>>) s -> {
throw new RuntimeException("B's failed");
})
.flatMap((Function<String, SingleSource<String>>) s -> {
return Single.zip(c1, c2, c3, (one, two, three) -> "C's completed successfully");
})
.subscribe(
s -> System.out.println("## onSuccess(" + s + ")"),
t -> System.out.println("## onError(" + t.getMessage() + ")")
);
(如果您不熟悉,可以使用 zip() 运算符组合作为输入提供的所有源的结果以发出 another/new 源)。
在这个流中,因为B的处理最终抛出异常:
- 流在 B 的执行期间终止
- 异常被报告给观察者(即
onError()
处理程序被触发)
- C 从未被处理
但是,如果您想要自己决定是否执行每个分支,您可以采取的一种方法是使用某种状态持有者将先前操作的结果向下传递,如下所示:
class State {
final String value;
final Throwable error;
State(String value, Throwable error) {
this.value = value;
this.error = error;
}
}
然后可以修改流以有条件地执行不同的批处理,例如:
Single
.zip(a1, a2, a3, (s, s2, s3) -> {
try {
// Execute the A's here...
return new State("A's completed successfully", null);
} catch(Throwable t) {
return new State(null, t);
}
})
.flatMap((Function<State, SingleSource<State>>) s -> {
if(s.error != null) {
// If an error occurred upstream, skip this batch...
return Single.just(s);
} else {
try {
// ...otherwise, execute the B's
return Single.just(new State("B's completed successfully", null));
} catch(Throwable t) {
return Single.just(new State(null, t));
}
}
})
.flatMap((Function<State, SingleSource<State>>) s -> {
if(s.error != null) {
// If an error occurred upstream, skip this batch...
return Single.just(s);
} else {
try {
// ...otherwise, execute the C's
return Single.just(new State("C's completed successfully", null));
} catch(Throwable t) {
return Single.just(new State(null, t));
}
}
})
.subscribe(
s -> {
if(s.error != null) {
System.out.println("## onSuccess with error: " + s.error.getMessage());
} else {
System.out.println("## onSuccess without error: " + s.value);
}
},
t -> System.out.println("## onError(" + t.getMessage() + ")")
);
在对 Vertx 源代码进行一些研究之后,我发现了一个 public 方法,CompositeFuture 的 rx 版本使用该方法将 'traditional' CompositeFuture 转换为其 rx 版本。方法是io.vertx.reactivex.core.CompositeFuture.newInstance。通过这种解决方法,我可以使用我的传统方法,然后将其转换为在 rx 链中使用。这就是我想要的,因为改变现有的传统方法是有问题的。
这是带注释的代码:
rxGetConfig(vertx)
.flatMap(config -> {
return rxComposeAny(vertx, config)
.flatMap(r -> rxComposeAny(vertx, config))
.flatMap(r -> rxComposeAll(vertx, config));
})
.subscribe(
compositeFuture -> {
compositeFuture.onSuccess(event -> startPromise.complete());
},
error -> startPromise.fail(error));
public static Single<JsonObject> rxGetConfig(Vertx vertx) {
ConfigRetrieverOptions enrichConfigRetrieverOptions = getEnrichConfigRetrieverOptions();
// the reason we create new vertx is just to get an instance that is rx
// so this ConfigRetriever is from io.vertx.reactivex.config, instead of normal io.vertx.config
ConfigRetriever configRetriever = ConfigRetriever.create(io.vertx.reactivex.core.Vertx.newInstance(vertx), enrichConfigRetrieverOptions);
return configRetriever.rxGetConfig();
}
public static Single<io.vertx.reactivex.core.CompositeFuture> rxComposeAny(Vertx vertx, JsonObject config) {
// instead of adapted all the parameters of myMethodsThatReturnsFutures to be rx compliant,
// we create it 'normally' and the converts bellow to rx CompositeFuture
CompositeFuture compositeFuture = CompositeFuture.any(myMethodsThatReturnsFutures(config));
return io.vertx.reactivex.core.CompositeFuture
.newInstance(compositeFuture)
.rxOnComplete();
}
我需要按顺序链接以 RxJava 样式的 Vertx CompositeFutures 来依赖 CompositeFuture,避免回调地狱。
用例:
每个 CompositeFuture.any/all 做一些 return 未来的异步操作,比方说 myList1、myList2、myList3,但我 必须等待 t CompositeFuture.any(myList1) 完成 return 成功 然后再执行 CompositeFuture.any(myList2),从 myList2 到 myList3 也是如此。当然,CompositeFuture 本身会异步执行作业,但只是针对它的一组操作,因为下一组必须在第一组顺利完成后立即完成。
以“回调地狱风格”进行操作将是:
public static void myFunc(Vertx vertx, Handler<AsyncResult<CompositeFuture>> asyncResultHandler) {
CompositeFuture.any(myList1 < Future >)
.onComplete(ar1 -> {
if (!ar1.succeeded()) {
asyncResultHandler.handle(ar1);
} else {
CompositeFuture.any(myList2 < Future >)
.onComplete(ar2 -> {
if (!ar2.succeeded()) {
asyncResultHandler.handle(ar2);
} else {
CompositeFuture.all(myList3 < Future >)
.onComplete(ar3 -> {
asyncResultHandler.handle(ar3);
.... <ARROW OF CLOSING BRACKETS> ...
}
现在我尝试了这样的事情:
public static void myFunc(Vertx vertx, Handler<AsyncResult<CompositeFuture>> asyncResultHandler) {
Single
.just(CompositeFuture.any(myList1 < Future >))
.flatMap(previousFuture -> rxComposeAny(previousFuture, myList2 < Future >))
.flatMap(previousFuture -> rxComposeAll(previousFuture, myList3 < Future >))
.subscribe(SingleHelper.toObserver(asyncResultHandler));
}
public static Single<CompositeFuture> rxComposeAny(CompositeFuture previousResult, List<Future> myList) {
if (previousResult.failed()) return Single.just(previousResult); // See explanation bellow
CompositeFuture compositeFuture = CompositeFuture.any(myList);
return Single.just(compositeFuture);
}
public static Single<CompositeFuture> rxComposeAll(CompositeFuture previousResult, List<Future> myList) {
if (previousResult.failed()) return Single.just(previousResult);
CompositeFuture compositeFuture = CompositeFuture.any(myList);
return Single.just(compositeFuture);
}
}
更加简洁明了。 但是,我没有成功将之前的失败传递给 asyncResultHandler。
我的想法是这样的:flatMap通过了之前的CompositeFuture结果,我想检查它是否失败了。下一个 rxComposeAny/All 首先检查前一个是否失败,如果是,则只 return 失败的 CompositeFuture 等等,直到它命中订阅者中的处理程序。如果前一个通过了测试,我可以继续传递当前结果,直到最后一个成功的 CompositeFuture 命中处理程序。
问题是检查
if (previousResult.failed()) return Single.just(previousResult); // See explanation bellow
不起作用,所有 CompositeFutures 都已处理,但未测试是否成功完成,只有最后一个最终被传递给 asyncResultHandler,它将测试整体失败(但在我的代码中,它最终只检查了最后一个)
我正在使用 Vertx 3.9.0 和 RxJava 2 Vertx API。
披露:我在 Vertx 方面有经验,但我在 RxJava 方面是全新的。所以我很感激任何答案,从技术解决方案到概念解释。
谢谢。
编辑(在@homerman 的出色回应之后): 我需要具有与顺序依赖的 CompositeFutures 的“回调地狱风格”完全相同的行为,即 必须在 onComplete 之后调用 next 并测试是否完成失败或成功。复杂性来自以下事实:
- 我必须使用 vertx CompositeAll/Any 方法,而不是 zip。 Zip 提供类似于 CompositeAll 的行为,但不是 CompositeAny。
- CompositeAll/Any return 完成的未来就在 onComplete 方法中。如果我之前像上面显示的那样检查它,因为它是异步的,我会得到未解决的期货。
- CompositeAll/Any 如果失败不会抛出错误,但在 onComplete 内部失败,所以我不能使用 rxJava 的 onError。
例如,我在 rxComposite 函数中尝试了以下更改:
public static Single<CompositeFuture> rxLoadVerticlesAny(CompositeFuture previousResult, Vertx vertx, String deploymentName,
List<Class<? extends Verticle>> verticles, JsonObject config) {
previousResult.onComplete(event -> {
if (event.failed()) {
return Single.just(previousResult);
} else {
CompositeFuture compositeFuture = CompositeFuture.any(VertxDeployHelper.deploy(vertx, verticles, config));
return Single.just(compositeFuture);
}
}
);
}
但它自然不会编译,因为 lambda 是无效的。如何在 Vertx 中重现与 rxJava 完全相同的行为?
澄清一些事情...
Each CompositeFuture.any/all do some async operations that return futures, lets say myList1, myList2, myList3, but I must wait for CompositeFuture.any(myList1) to complete and return success before doing CompositeFuture.any(myList2), and the same from myList2 to myList3.
您提供了 CompositeFuture.any()
和 CompositeFuture.all()
作为参考点,但您描述的行为与 all()
一致,也就是说生成的组合只会产生成功如果 所有 它的成员都这样做。
出于我的回答目的,我假设 all()
是您期望的行为。
在 RxJava 中,由异常触发的意外错误将导致流终止,底层异常通过 onError()
回调传递给观察者。
作为一个小演示,假设以下设置:
final Single<String> a1 = Single.just("Batch-A-Operation-1");
final Single<String> a2 = Single.just("Batch-A-Operation-2");
final Single<String> a3 = Single.just("Batch-A-Operation-3");
final Single<String> b1 = Single.just("Batch-B-Operation-1");
final Single<String> b2 = Single.just("Batch-B-Operation-2");
final Single<String> b3 = Single.just("Batch-B-Operation-3");
final Single<String> c1 = Single.just("Batch-C-Operation-1");
final Single<String> c2 = Single.just("Batch-C-Operation-2");
final Single<String> c3 = Single.just("Batch-C-Operation-3");
每个Single
代表一个要执行的离散操作,它们根据一些逻辑分组进行逻辑命名(即它们应该一起执行)。例如,“Batch-A”对应您的“myList1”,“Batch-B”对应您的“myList2”,...
假定以下流:
Single
.zip(a1, a2, a3, (s, s2, s3) -> {
return "A's completed successfully";
})
.flatMap((Function<String, SingleSource<String>>) s -> {
throw new RuntimeException("B's failed");
})
.flatMap((Function<String, SingleSource<String>>) s -> {
return Single.zip(c1, c2, c3, (one, two, three) -> "C's completed successfully");
})
.subscribe(
s -> System.out.println("## onSuccess(" + s + ")"),
t -> System.out.println("## onError(" + t.getMessage() + ")")
);
(如果您不熟悉,可以使用 zip() 运算符组合作为输入提供的所有源的结果以发出 another/new 源)。
在这个流中,因为B的处理最终抛出异常:
- 流在 B 的执行期间终止
- 异常被报告给观察者(即
onError()
处理程序被触发) - C 从未被处理
但是,如果您想要自己决定是否执行每个分支,您可以采取的一种方法是使用某种状态持有者将先前操作的结果向下传递,如下所示:
class State {
final String value;
final Throwable error;
State(String value, Throwable error) {
this.value = value;
this.error = error;
}
}
然后可以修改流以有条件地执行不同的批处理,例如:
Single
.zip(a1, a2, a3, (s, s2, s3) -> {
try {
// Execute the A's here...
return new State("A's completed successfully", null);
} catch(Throwable t) {
return new State(null, t);
}
})
.flatMap((Function<State, SingleSource<State>>) s -> {
if(s.error != null) {
// If an error occurred upstream, skip this batch...
return Single.just(s);
} else {
try {
// ...otherwise, execute the B's
return Single.just(new State("B's completed successfully", null));
} catch(Throwable t) {
return Single.just(new State(null, t));
}
}
})
.flatMap((Function<State, SingleSource<State>>) s -> {
if(s.error != null) {
// If an error occurred upstream, skip this batch...
return Single.just(s);
} else {
try {
// ...otherwise, execute the C's
return Single.just(new State("C's completed successfully", null));
} catch(Throwable t) {
return Single.just(new State(null, t));
}
}
})
.subscribe(
s -> {
if(s.error != null) {
System.out.println("## onSuccess with error: " + s.error.getMessage());
} else {
System.out.println("## onSuccess without error: " + s.value);
}
},
t -> System.out.println("## onError(" + t.getMessage() + ")")
);
在对 Vertx 源代码进行一些研究之后,我发现了一个 public 方法,CompositeFuture 的 rx 版本使用该方法将 'traditional' CompositeFuture 转换为其 rx 版本。方法是io.vertx.reactivex.core.CompositeFuture.newInstance。通过这种解决方法,我可以使用我的传统方法,然后将其转换为在 rx 链中使用。这就是我想要的,因为改变现有的传统方法是有问题的。
这是带注释的代码:
rxGetConfig(vertx)
.flatMap(config -> {
return rxComposeAny(vertx, config)
.flatMap(r -> rxComposeAny(vertx, config))
.flatMap(r -> rxComposeAll(vertx, config));
})
.subscribe(
compositeFuture -> {
compositeFuture.onSuccess(event -> startPromise.complete());
},
error -> startPromise.fail(error));
public static Single<JsonObject> rxGetConfig(Vertx vertx) {
ConfigRetrieverOptions enrichConfigRetrieverOptions = getEnrichConfigRetrieverOptions();
// the reason we create new vertx is just to get an instance that is rx
// so this ConfigRetriever is from io.vertx.reactivex.config, instead of normal io.vertx.config
ConfigRetriever configRetriever = ConfigRetriever.create(io.vertx.reactivex.core.Vertx.newInstance(vertx), enrichConfigRetrieverOptions);
return configRetriever.rxGetConfig();
}
public static Single<io.vertx.reactivex.core.CompositeFuture> rxComposeAny(Vertx vertx, JsonObject config) {
// instead of adapted all the parameters of myMethodsThatReturnsFutures to be rx compliant,
// we create it 'normally' and the converts bellow to rx CompositeFuture
CompositeFuture compositeFuture = CompositeFuture.any(myMethodsThatReturnsFutures(config));
return io.vertx.reactivex.core.CompositeFuture
.newInstance(compositeFuture)
.rxOnComplete();
}