用于可观察对象列表的 Rxjava 运算符并停止连接第一个完成的流
Rxjava operator for list of observables and stops concatenating on first completed stream
我有一个 observables 列表,其中一些很可能会在订阅时引发错误。我想将这些 observables 连接到一个流中并忽略所有引发的错误。如果出现错误,它应该开始订阅列表中的新流并继续,直到某个流完成。在这种情况下,它不应订阅其他流并完成。
例如,有一个包含三个可观察值的列表。
- 订阅第一个 -> 它引发了一个错误,所以我订阅了第二个
- 订阅第二个 -> 它完成且没有任何错误,因此发出 onCompleted
- 在第二个流成功完成时永远不会订阅第三个流
大理石:
--o---o----x--------------------
-------------o-------o-----|----
--------------------------------
--o---o------o-------o-----|----
我尝试使用 onerrorresumenext
,但不知道如何将其与 concat
运算符连接。
您确实需要 concatMap
,捕获中间源中的任何潜在错误,在操作员之间共享,然后在最后重新引入错误。
List<Observable<Integer>> sources = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final int j = i;
if (i == 5) {
sources.add(Observable.just(i)
.doOnSubscribe(d -> System.out.println("Subscribed: " + j)));
} else {
sources.add(Observable.<Integer>error(new Exception("" + i))
.doOnSubscribe(d -> System.out.println("Subscribed: " + j))
);
}
}
// -------------------------------------------------------
Observable.defer(() -> {
AtomicReference<Throwable> lastError = new AtomicReference<>();
AtomicBoolean lastSuccessful = new AtomicReference<>();
return Observable.fromIterable(sources)
.takeWhile(t -> !lastSuccessful.get())
.concatMap(source -> {
return source
.doOnComplete(() -> lastSuccessful.set(true))
.doOnError(e -> lastError.set(e))
.onErrorComplete();
})
.concatWith(Observable.defer(() -> {
if (lastSuccessful.get()) {
return Observable.empty();
}
return Observable.error(lastError.get());
}));
})
.test()
.assertResult(5);
我有一个 observables 列表,其中一些很可能会在订阅时引发错误。我想将这些 observables 连接到一个流中并忽略所有引发的错误。如果出现错误,它应该开始订阅列表中的新流并继续,直到某个流完成。在这种情况下,它不应订阅其他流并完成。
例如,有一个包含三个可观察值的列表。
- 订阅第一个 -> 它引发了一个错误,所以我订阅了第二个
- 订阅第二个 -> 它完成且没有任何错误,因此发出 onCompleted
- 在第二个流成功完成时永远不会订阅第三个流
大理石:
--o---o----x--------------------
-------------o-------o-----|----
--------------------------------
--o---o------o-------o-----|----
我尝试使用 onerrorresumenext
,但不知道如何将其与 concat
运算符连接。
您确实需要 concatMap
,捕获中间源中的任何潜在错误,在操作员之间共享,然后在最后重新引入错误。
List<Observable<Integer>> sources = new ArrayList<>();
for (int i = 0; i < 10; i++) {
final int j = i;
if (i == 5) {
sources.add(Observable.just(i)
.doOnSubscribe(d -> System.out.println("Subscribed: " + j)));
} else {
sources.add(Observable.<Integer>error(new Exception("" + i))
.doOnSubscribe(d -> System.out.println("Subscribed: " + j))
);
}
}
// -------------------------------------------------------
Observable.defer(() -> {
AtomicReference<Throwable> lastError = new AtomicReference<>();
AtomicBoolean lastSuccessful = new AtomicReference<>();
return Observable.fromIterable(sources)
.takeWhile(t -> !lastSuccessful.get())
.concatMap(source -> {
return source
.doOnComplete(() -> lastSuccessful.set(true))
.doOnError(e -> lastError.set(e))
.onErrorComplete();
})
.concatWith(Observable.defer(() -> {
if (lastSuccessful.get()) {
return Observable.empty();
}
return Observable.error(lastError.get());
}));
})
.test()
.assertResult(5);