RxJava 2 出错后如何继续处理?

How to continue processing after an error happens in RxJava 2?

我有一个 PublishSubject 和一个 Subscriber,我用它们来处理(可能)无限的预处理数据流。问题是某些元素可能包含一些错误。我想忽略它们并继续处理。我该怎么做?我试过这样的事情:

    val subject = PublishSubject.create<String>()
    subject.retry().subscribe({
        println("next: $it")
    }, {
        println("error")
    }, {
        println("complete")
    })

    subject.onNext("foo")
    subject.onNext("bar")
    subject.onError(RuntimeException())
    subject.onNext("wom")
    subject.onComplete()

我的问题是 none 的错误处理方法在这里帮助我:

onErrorResumeNext() — instructs an Observable to emit a sequence of items if it encounters an error

onErrorReturn( ) — instructs an Observable to emit a particular item when it encounters an error

onExceptionResumeNext( ) — instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)

retry( ) — if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error

retryWhen( ) — if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source

例如,我尝试了 retry(),但它在错误发生后无限期挂起我的进程。

我也试过 onErrorResumeNext() 但它没有按预期工作:

    val backupSubject = PublishSubject.create<String>()
    val subject = PublishSubject.create<String>()
    var currentSubject = subject
    subject.onErrorResumeNext(backupSubject).subscribe({
        println("next: $it")
    }, {
        println("error")
        currentSubject = backupSubject
    }, {
        println("complete")
    })

    backupSubject.subscribe({
        println("backup")
    }, {
        println("backup error")
    })

    currentSubject.onNext("foo")
    currentSubject.onNext("bar")
    currentSubject.onError(RuntimeException())
    currentSubject.onNext("wom")
    currentSubject.onComplete()

这只会打印 foobar

如果您想在出错后继续处理,这意味着您的错误是一个与您的 String 一样的值,应该经过 onNext。在这种情况下,为了确保类型安全,您应该使用某种形式的包装器,它可以采用常规值或错误;例如,io.reactivex.Notification<T> 在 RxJava 2 中可用:

PublishSubject<Notification<String>> subject = PublishSubject.create();

subject.subscribe(System.out::println);

subject.onNext(Notification.createOnNext("Hello"));
subject.onNext(Notification.<String>createOnError(new RuntimeException("oops")));
subject.onNext(Notification.createOnNext("World"));