Vertx Future 不等待
Vertx Future does not wait
因为我在我的堆栈中使用 Vertx 3.1,所以我想使用这些工具带来的 Future 功能,但在阅读之后 API 似乎对我来说非常有限。我什至找不到让未来等待 Observable 的方法。
这是我的代码
public Observable<CommitToOrderCommand> validateProductRestrictions(CommitToOrderCommand cmd) {
Future<Observable<CommitToOrderCommand>> future = Future.future();
orderRepository.getOrder(cmd, cmd.orderId)
.flatMap(order -> validateOrderProducts(cmd, order))
.subscribe(map -> checkMapValues(map, future, cmd));
Observable<CommitToOrderCommand> result = future.result();
if(errorFound){
throw MAX_QUANTITY_PRODUCT_EXCEED.create("Fail"/*restrictions.getBulkBuyLimit().getDescription())*/);
}
return result;
}
private void checkMapValues(Multimap<String, BigDecimal> totalUnitByRestrictions, Future<Observable<CommitToOrderCommand>> future,
CommitToOrderCommand cmd) {
for (String restrictionName : totalUnitByRestrictions.keySet()) {
Restrictions restrictions = Restrictions.valueOf(restrictionName);
if (totalUnitByRestrictions.get(restrictionName)
.stream()
.reduce(BigDecimal.ZERO, BigDecimal::add)
.compareTo(restrictions.getBulkBuyLimit()
.getMaxQuantity()) == 1) {
errorFound = true;
}
}
future.complete(Observable.just(cmd));
}
在我的第一个 Observable 的 onComplete 中,我检查了结果,完成之后是我完成 future 以解锁操作的时间。
但我正在寻找 future.result 不会阻塞,直到 future.complete 像我预期的那样被调用。相反只是返回 null。
知道这里出了什么问题吗?
此致。
vertx future 不会阻塞,而是与注入结果时调用的处理程序一起工作(参见 setHandler
和 isComplete
)。
如果外层代码需要一个Observable,不需要用Future包裹,return Observable<T>
即可。 Future<Observable<T>>
没有多大意义,您混合了两种处理异步结果的方法。
请注意,有一些方法可以将 Observable 折叠成 Future,但困难在于 Observable 可能会发出多个项目,而 Future 只能包含一个项目。您已经通过将您的结果收集到单个地图发射中来解决这个问题。
因为这个 Observable
只发出一个项目,如果你想要一个 Future
从它你应该 subscribe
它并调用 future.complete(yourMap)
在 onNext
方法。还要定义一个 onError
处理程序,它将调用 future.fail
.
因为我在我的堆栈中使用 Vertx 3.1,所以我想使用这些工具带来的 Future 功能,但在阅读之后 API 似乎对我来说非常有限。我什至找不到让未来等待 Observable 的方法。 这是我的代码
public Observable<CommitToOrderCommand> validateProductRestrictions(CommitToOrderCommand cmd) {
Future<Observable<CommitToOrderCommand>> future = Future.future();
orderRepository.getOrder(cmd, cmd.orderId)
.flatMap(order -> validateOrderProducts(cmd, order))
.subscribe(map -> checkMapValues(map, future, cmd));
Observable<CommitToOrderCommand> result = future.result();
if(errorFound){
throw MAX_QUANTITY_PRODUCT_EXCEED.create("Fail"/*restrictions.getBulkBuyLimit().getDescription())*/);
}
return result;
}
private void checkMapValues(Multimap<String, BigDecimal> totalUnitByRestrictions, Future<Observable<CommitToOrderCommand>> future,
CommitToOrderCommand cmd) {
for (String restrictionName : totalUnitByRestrictions.keySet()) {
Restrictions restrictions = Restrictions.valueOf(restrictionName);
if (totalUnitByRestrictions.get(restrictionName)
.stream()
.reduce(BigDecimal.ZERO, BigDecimal::add)
.compareTo(restrictions.getBulkBuyLimit()
.getMaxQuantity()) == 1) {
errorFound = true;
}
}
future.complete(Observable.just(cmd));
}
在我的第一个 Observable 的 onComplete 中,我检查了结果,完成之后是我完成 future 以解锁操作的时间。 但我正在寻找 future.result 不会阻塞,直到 future.complete 像我预期的那样被调用。相反只是返回 null。
知道这里出了什么问题吗?
此致。
vertx future 不会阻塞,而是与注入结果时调用的处理程序一起工作(参见 setHandler
和 isComplete
)。
如果外层代码需要一个Observable,不需要用Future包裹,return Observable<T>
即可。 Future<Observable<T>>
没有多大意义,您混合了两种处理异步结果的方法。
请注意,有一些方法可以将 Observable 折叠成 Future,但困难在于 Observable 可能会发出多个项目,而 Future 只能包含一个项目。您已经通过将您的结果收集到单个地图发射中来解决这个问题。
因为这个 Observable
只发出一个项目,如果你想要一个 Future
从它你应该 subscribe
它并调用 future.complete(yourMap)
在 onNext
方法。还要定义一个 onError
处理程序,它将调用 future.fail
.