在 rxJava 中压缩超过 9 个 Observable
Zip more than 9 Observables in rxJava
遇到需要进行10-12个小的并行查询并合并结果的情况。但是如果有一个 zip 方法可以让你组合最多 9 个 Observables,那么我不明白如何做更多。我尝试使用 zip 方法
public static <T, R> Observable<R> zip(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper)
但是却出错了
java.lang.ClassCastException: java.util.ArrayList cannot be cast to io.reactivex.ObservableSource
尝试看起来像这样
List<Observable> list = new ArrayList<>();
list.add(orderRepository.getOne(54, "id"));
list.add(orderRepository.getTwo(54, "id"));
list.add(orderRepository.getThree(54, "id"));
list.add(orderRepository.getFour(54, "id"));
list.add(orderRepository.getFive());
list.add(orderRepository.getSix(54, "id"));
list.add(orderRepository.getSeven(54, "id"));
list.add(orderRepository.getEight());
list.add(orderRepository.getNine());
list.add(orderRepository.getTen(54, "id"));
list.add(orderRepository.getEleven(54, "id"));
Observable.fromIterable(list);
return Observable.zip(list,
new Function<Object[], OrderModel>() {
@Override
public OrderModel apply(Object[] objects) throws Exception {
Logger.trace("objects = ", objects);
return new OrderModel();
}
});
请举例说明如何在 Java 或 Kotlin 中执行此操作。
Observable one = Observable.zip(
orderRepository.getOne(54, "id"),
orderRepository.getTwo(54, "id"),
// Etc up to five (I think)
Function zipper
);
Observable two = Observable.zip(
one, orderRepository.getSix(54, "id"),
orderRepository.getSeven(54, "id"),
// Etc up to five,
Function zipper
);
冲洗并重复,直到您将所有可观察对象压缩在一起。
这里是一个如何使用 Iterable 的 Observable#zip 重载的例子。你得到一个 new Object[] 的结果,因为 Java generics does not Support new T[].
在此示例中,您将看到 T 的 Observable 列表,它将在 Observable#zip 中使用。拉链函数会将每个对象转换为字符串和 return 一个 T 列表。结果是 Observable>。
此外,我建议您确保给定的 Observable 列表不为空。当一个空的 Observable 列表被提供给 Observable#zip 时,它将立即完成而不会发出任何东西(感谢@akarnokd)。或者您可以只使用 Observable#switchIfEmpty 来提供后备值(例如空列表)
@Test
void x() {
Observable<String> z1 = Observable.just("1");
Observable<String> z2 = Observable.just("2");
Observable<String> z3 = Observable.just("3");
List<Observable<String>> observables = Arrays.asList(z1, z2, z3);
Observable<List<String>> zip =
Observable.zip(
observables,
objects -> {
List<String> resultList =
Stream.of(objects).map(o -> (String) o).collect(Collectors.toList());
return resultList;
});
zip.test()
.assertNoErrors()
.assertComplete()
.assertValueCount(1)
.assertValueAt(
0,
r -> {
assertThat(r).contains("1", "2", "3");
return true;
});
}
Observable.zip(Observable1 , Observable2 ,... Observable9 ,
Function9<return_value_observable1 , return_value_observable2 , .... ,
return_value_observable9 , ZipModel { ->
}
遇到需要进行10-12个小的并行查询并合并结果的情况。但是如果有一个 zip 方法可以让你组合最多 9 个 Observables,那么我不明白如何做更多。我尝试使用 zip 方法
public static <T, R> Observable<R> zip(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper)
但是却出错了
java.lang.ClassCastException: java.util.ArrayList cannot be cast to io.reactivex.ObservableSource
尝试看起来像这样
List<Observable> list = new ArrayList<>();
list.add(orderRepository.getOne(54, "id"));
list.add(orderRepository.getTwo(54, "id"));
list.add(orderRepository.getThree(54, "id"));
list.add(orderRepository.getFour(54, "id"));
list.add(orderRepository.getFive());
list.add(orderRepository.getSix(54, "id"));
list.add(orderRepository.getSeven(54, "id"));
list.add(orderRepository.getEight());
list.add(orderRepository.getNine());
list.add(orderRepository.getTen(54, "id"));
list.add(orderRepository.getEleven(54, "id"));
Observable.fromIterable(list);
return Observable.zip(list,
new Function<Object[], OrderModel>() {
@Override
public OrderModel apply(Object[] objects) throws Exception {
Logger.trace("objects = ", objects);
return new OrderModel();
}
});
请举例说明如何在 Java 或 Kotlin 中执行此操作。
Observable one = Observable.zip(
orderRepository.getOne(54, "id"),
orderRepository.getTwo(54, "id"),
// Etc up to five (I think)
Function zipper
);
Observable two = Observable.zip(
one, orderRepository.getSix(54, "id"),
orderRepository.getSeven(54, "id"),
// Etc up to five,
Function zipper
);
冲洗并重复,直到您将所有可观察对象压缩在一起。
这里是一个如何使用 Iterable 的 Observable#zip 重载的例子。你得到一个 new Object[] 的结果,因为 Java generics does not Support new T[].
在此示例中,您将看到 T 的 Observable 列表,它将在 Observable#zip 中使用。拉链函数会将每个对象转换为字符串和 return 一个 T 列表。结果是 Observable>。
此外,我建议您确保给定的 Observable 列表不为空。当一个空的 Observable 列表被提供给 Observable#zip 时,它将立即完成而不会发出任何东西(感谢@akarnokd)。或者您可以只使用 Observable#switchIfEmpty 来提供后备值(例如空列表)
@Test
void x() {
Observable<String> z1 = Observable.just("1");
Observable<String> z2 = Observable.just("2");
Observable<String> z3 = Observable.just("3");
List<Observable<String>> observables = Arrays.asList(z1, z2, z3);
Observable<List<String>> zip =
Observable.zip(
observables,
objects -> {
List<String> resultList =
Stream.of(objects).map(o -> (String) o).collect(Collectors.toList());
return resultList;
});
zip.test()
.assertNoErrors()
.assertComplete()
.assertValueCount(1)
.assertValueAt(
0,
r -> {
assertThat(r).contains("1", "2", "3");
return true;
});
}
Observable.zip(Observable1 , Observable2 ,... Observable9 ,
Function9<return_value_observable1 , return_value_observable2 , .... ,
return_value_observable9 , ZipModel { ->
}