忽略 Observable 中的错误<Single<T>>

Ignore errors in an Observable<Single<T>>

我有一个 Observable<Single<T>>,我想将其转换为新的 Observable<T>,其中任何失败的 Single<T> 都会被忽略。

这是我的尝试:

public static <T> Observable<T> skipErrors(final Observable<Single<T>> xs) {
    Preconditions.checkNotNull(xs);
    return xs.flatMap(single -> single.map(Optional::of)
        .onErrorReturn(error -> Optional.empty())
        .flatMapObservable(optional ->
            optional.map(Observable::just).orElseGet(Observable::empty)));
}

基本上,它将每一次成功都包装在 Optional 中,并将每一次失败映射到 Optional.empty()。然后在 flatMapObservable 中过滤选项。

有没有更惯用的方法来做到这一点?


单元测试:

final Observable<Single<String>> observable = skipErrors(Observable.just(
    Single.error(new Exception()),
    Single.error(new Exception()),
    Single.just("Hello"),
    Single.error(new Exception()),
    Single.just("world"),
    Single.error(new Exception())));

final ImmutableList<String> expected = ImmutableList.of("Hello", "world");
final ImmutableList<String> actual = observable.toList()
    .blockingGet()
    .stream()
    .collect(ImmutableList.toImmutableList());

assertEquals(expected, actual);

我想你应该可以做得更简单一点:

public static <T> Observable<T> skipErrors(final Observable<Single<T>> singles) {
  Preconditions.checkNotNull(xs);
  return singles
    .flatMap(single -> single
        .toObservable()
        .onErrorResumeNext(Observable.empty())
    );
}