用于可观察对象列表的 Rxjava 运算符并停止连接第一个完成的流

Rxjava operator for list of observables and stops concatenating on first completed stream

我有一个 observables 列表,其中一些很可能会在订阅时引发错误。我想将这些 observables 连接到一个流中并忽略所有引发的错误。如果出现错误,它应该开始订阅列表中的新流并继续,直到某个流完成。在这种情况下,它不应订阅其他流并完成。

例如,有一个包含三个可观察值的列表。

大理石:

--o---o----x--------------------
-------------o-------o-----|----
--------------------------------
  
--o---o------o-------o-----|----

我尝试使用 onerrorresumenext,但不知道如何将其与 concat 运算符连接。

您确实需要 concatMap,捕获中间源中的任何潜在错误,在操作员之间共享,然后在最后重新引入错误。

List<Observable<Integer>> sources = new ArrayList<>();

for (int i = 0; i < 10; i++) {
    final int j = i;
    if (i == 5) {
        sources.add(Observable.just(i)
                .doOnSubscribe(d -> System.out.println("Subscribed: " + j)));
    } else {
        sources.add(Observable.<Integer>error(new Exception("" + i))
                .doOnSubscribe(d -> System.out.println("Subscribed: " + j))
        );
    }
}
// -------------------------------------------------------

Observable.defer(() -> {
    AtomicReference<Throwable> lastError = new AtomicReference<>();
    AtomicBoolean lastSuccessful = new AtomicReference<>();
    return Observable.fromIterable(sources)
                     .takeWhile(t -> !lastSuccessful.get())
                     .concatMap(source -> {
                         return source
                                .doOnComplete(() -> lastSuccessful.set(true))
                                .doOnError(e -> lastError.set(e))
                                .onErrorComplete();
                     })
                     .concatWith(Observable.defer(() -> {
                         if (lastSuccessful.get()) {
                             return Observable.empty();
                         }
                         return Observable.error(lastError.get());
                     }));

})
.test()
.assertResult(5);