如何忽略错误并继续无限流?

How to ignore error and continue infinite stream?

我想知道如何忽略异常并继续无限流(在我的情况下是位置流)?

我正在获取当前用户位置(使用 Android-ReactiveLocation) and then sending them to my API (using Retrofit)。

在我的例子中,当网络调用期间发生异常时(例如超时)onError 方法被调用并且流自行停止。如何避免?

Activity:

private RestService mRestService;
private Subscription mSubscription;
private LocationRequest mLocationRequest = LocationRequest.create()
            .setPriority(LocationRequest.PRIORITY_HIGH_ACCURACY)
            .setInterval(100);
...
private void start() {
    mRestService = ...;
    ReactiveLocationProvider reactiveLocationProvider = new ReactiveLocationProvider(this);
    mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
            .buffer(50)
            .flatMap(locations -> mRestService.postLocations(locations)) // can throw exception
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe();
}

休息服务:

public interface RestService {
    @POST("/.../")
    Observable<Response> postLocations(@Body List<Location> locations);
}

您可能想使用 error handling operators 之一。

  • onErrorResumeNext( ) — 指示 Observable 在遇到错误时发出一系列项目
  • onErrorReturn( ) — 指示 Observable 在遇到错误时发出特定项目
  • onExceptionResumeNext( ) — 指示 Observable 在遇到异常后继续发射项目(但不是另一种类型的 throwable)
  • retry( ) — 如果源 Observable 发出错误,重新订阅它,希望它能无错误地完成
  • retryWhen( ) — 如果源 Observable 发出错误,将该错误传递给另一个 Observable 以确定是否重新订阅源

特别是 retryonExceptionResumeNext 在你的情况下看起来很有希望。

尝试在 Observable.defer 调用中调用其余服务。这样,对于每个调用,您将有机会使用它自己的 'onErrorResumeNext',并且错误不会导致您的主流完成。

reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
  .buffer(50)
  .flatMap(locations ->
    Observable.defer(() -> mRestService.postLocations(locations))
      .onErrorResumeNext(<SOME_DEFAULT_TO_REACT_TO>)
  )
........

该解决方案最初来自此线程 -> RxJava Observable and Subscriber for skipping exception?,但我认为它也适用于您的情况。

mRestService.postLocations(locations) 发出一个物品,然后完成。 如果发生错误,它会发出错误,从而完成流。

当您在 flatMap 中调用此方法时,错误继续到您的 "main" 流,然后您的流停止。

您可以做的是将您的错误转换为另一个项目(如此处所述:),但不是在您的主流上(因为我假设您已经尝试过了),而是在 mRestService.postLocations(locations).

这样,此调用将发出错误,该错误将转换为 item/another 可观察对象,然后完成。 (不调用 onError)。

在消费者看来,mRestService.postLocations(locations) 将发出一个项目,然后完成,就像一切都成功一样。

mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
        .buffer(50)
        .flatMap(locations -> mRestService.postLocations(locations).onErrorReturn((e) -> Collections.emptyList()) // can't throw exception
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe();

只需粘贴@MikeN 的回答中的 link 信息以防丢失:

import rx.Observable.Operator;
import rx.functions.Action1;

public final class OperatorSuppressError<T> implements Operator<T, T> {
    final Action1<Throwable> onError;

    public OperatorSuppressError(Action1<Throwable> onError) {
        this.onError = onError;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> t1) {
        return new Subscriber<T>(t1) {

            @Override
            public void onNext(T t) {
                t1.onNext(t);
            }

            @Override
            public void onError(Throwable e) {
                onError.call(e);
            }

            @Override
            public void onCompleted() {
                t1.onCompleted();
            }

        };
    }
}

and use it close to the observable source because other operators may eagerly unsubscribe before that.

Observerable.create(connectToUnboundedStream()).lift(new OperatorSuppressError(log()).doOnNext(someStuff()).subscribe();

Note, however, that this suppresses the error delivery from the source. If any onNext in the chain after it throws an exception, it is still likely the source will be unsubscribed.

对解决方案 (@MikeN) 稍作修改,使有限流能够完成:

import rx.Observable.Operator;
import rx.functions.Action1;

public final class OperatorSuppressError<T> implements Operator<T, T> {
    final Action1<Throwable> onError;

    public OperatorSuppressError(Action1<Throwable> onError) {
        this.onError = onError;
    }

    @Override
    public Subscriber<? super T> call(final Subscriber<? super T> t1) {
        return new Subscriber<T>(t1) {

            @Override
            public void onNext(T t) {
                t1.onNext(t);
            }

            @Override
            public void onError(Throwable e) {
                onError.call(e);
                //this will allow finite streams to complete
                t1.onCompleted();
            }

            @Override
            public void onCompleted() {
                t1.onCompleted();
            }

        };
    }
}

添加我对这个问题的解决方案:

privider
    .compose(ignoreErrorsTransformer)
    .subscribe()

private final Observable.Transformer<ResultType, ResultType> ignoreErrorsTransformer =
        new Observable.Transformer<ResultType, ResultType>() {
            @Override
            public Observable<ResultType> call(Observable<ResultType> resultTypeObservable) {
                return resultTypeObservable
                        .materialize()
                        .filter(new Func1<Notification<ResultType>, Boolean>() {
                            @Override
                            public Boolean call(Notification<ResultType> resultTypeNotification) {
                                return !resultTypeNotification.isOnError();
                            }
                        })
                        .dematerialize();

            }
        };

如果您只想忽略 flatMap 中的错误而不返回元素,请执行以下操作:

flatMap(item -> 
    restService.getSomething(item).onErrorResumeNext(Observable.empty())
);

这是我用于忽略错误的 kotlin 扩展函数

fun <T> Observable<T>.ignoreErrors(errorHandler: (Throwable) -> Unit) =
    retryWhen { errors ->
        errors
            .doOnNext { errorHandler(it) }
            .map { 0 }
    }

这利用 retryWhen 无限期地重新订阅上游,同时仍然允许您以非终端方式处理错误。

这感觉很危险

使用 Rxjava2,我们可以调用带有 delayErrors 参数的重载平面图: flatmap javadoc

当它为真时:

exceptions from the current Flowable and all inner Publishers are delayed until all of them terminate if false, the first one signaling an exception will terminate the whole sequence immediately

这个答案可能有点晚了,但是如果有人偶然发现这个问题,可以使用 Jacke Wharton 提供的现成可用的 Relay 库,而不是重新发明轮子

https://github.com/JakeWharton/RxRelay

有很好的文档,但本质上,Relay 是一个 Subject,除了不能调用 onComplete 或 onError。

选项为:

行为中继

Relay that emits the most recent item it has observed and all subsequent observed items to each subscribed Observer.
    // observer will receive all events.
    BehaviorRelay<Object> relay = BehaviorRelay.createDefault("default");
    relay.subscribe(observer);
    relay.accept("one");
    relay.accept("two");
    relay.accept("three");

    // observer will receive the "one", "two" and "three" events, but not "zero"
    BehaviorRelay<Object> relay = BehaviorRelay.createDefault("default");
    relay.accept("zero");
    relay.accept("one");
    relay.subscribe(observer);
    relay.accept("two");
    relay.accept("three");

发布中继 中继,一旦观察者订阅,就会将所有随后观察到的项目发送给订阅者。

    PublishRelay<Object> relay = PublishRelay.create();
    // observer1 will receive all events
    relay.subscribe(observer1);
    relay.accept("one");
    relay.accept("two");
    // observer2 will only receive "three"
    relay.subscribe(observer2);
    relay.accept("three");

重播中继 中继缓冲它观察到的所有项目并将它们重播给任何订阅的观察者。

    ReplayRelay<Object> relay = ReplayRelay.create();
    relay.accept("one");
    relay.accept("two");
    relay.accept("three");
    // both of the following will get the events from above
    relay.subscribe(observer1);
    relay.subscribe(observer2);

您可以使用 onErrorComplete() 方法跳过错误

mSubscription = reactiveLocationProvider.getUpdatedLocation(mLocationRequest)
    .buffer(50)
    .flatMapMaybe(locations -> Maybe.just(mRestService.postLocations(locations).onErrorComplete()) // skip item
    .subscribeOn(Schedulers.newThread())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe();