RxJava 2 出错后如何继续处理?
How to continue processing after an error happens in RxJava 2?
我有一个 PublishSubject
和一个 Subscriber
,我用它们来处理(可能)无限的预处理数据流。问题是某些元素可能包含一些错误。我想忽略它们并继续处理。我该怎么做?我试过这样的事情:
val subject = PublishSubject.create<String>()
subject.retry().subscribe({
println("next: $it")
}, {
println("error")
}, {
println("complete")
})
subject.onNext("foo")
subject.onNext("bar")
subject.onError(RuntimeException())
subject.onNext("wom")
subject.onComplete()
我的问题是 none 的错误处理方法在这里帮助我:
onErrorResumeNext()
— instructs an Observable to emit a sequence of
items if it encounters an error
onErrorReturn( )
— instructs an
Observable to emit a particular item when it encounters an error
onExceptionResumeNext( )
— instructs an Observable to continue
emitting items after it encounters an exception (but not another
variety of throwable)
retry( )
— if a source Observable emits an
error, resubscribe to it in the hopes that it will complete without
error
retryWhen( )
— if a source Observable emits an error, pass that
error to another Observable to determine whether to resubscribe to the
source
例如,我尝试了 retry()
,但它在错误发生后无限期挂起我的进程。
我也试过 onErrorResumeNext()
但它没有按预期工作:
val backupSubject = PublishSubject.create<String>()
val subject = PublishSubject.create<String>()
var currentSubject = subject
subject.onErrorResumeNext(backupSubject).subscribe({
println("next: $it")
}, {
println("error")
currentSubject = backupSubject
}, {
println("complete")
})
backupSubject.subscribe({
println("backup")
}, {
println("backup error")
})
currentSubject.onNext("foo")
currentSubject.onNext("bar")
currentSubject.onError(RuntimeException())
currentSubject.onNext("wom")
currentSubject.onComplete()
这只会打印 foo
和 bar
。
如果您想在出错后继续处理,这意味着您的错误是一个与您的 String
一样的值,应该经过 onNext
。在这种情况下,为了确保类型安全,您应该使用某种形式的包装器,它可以采用常规值或错误;例如,io.reactivex.Notification<T>
在 RxJava 2 中可用:
PublishSubject<Notification<String>> subject = PublishSubject.create();
subject.subscribe(System.out::println);
subject.onNext(Notification.createOnNext("Hello"));
subject.onNext(Notification.<String>createOnError(new RuntimeException("oops")));
subject.onNext(Notification.createOnNext("World"));
我有一个 PublishSubject
和一个 Subscriber
,我用它们来处理(可能)无限的预处理数据流。问题是某些元素可能包含一些错误。我想忽略它们并继续处理。我该怎么做?我试过这样的事情:
val subject = PublishSubject.create<String>()
subject.retry().subscribe({
println("next: $it")
}, {
println("error")
}, {
println("complete")
})
subject.onNext("foo")
subject.onNext("bar")
subject.onError(RuntimeException())
subject.onNext("wom")
subject.onComplete()
我的问题是 none 的错误处理方法在这里帮助我:
onErrorResumeNext()
— instructs an Observable to emit a sequence of items if it encounters an error
onErrorReturn( )
— instructs an Observable to emit a particular item when it encounters an error
onExceptionResumeNext( )
— instructs an Observable to continue emitting items after it encounters an exception (but not another variety of throwable)
retry( )
— if a source Observable emits an error, resubscribe to it in the hopes that it will complete without error
retryWhen( )
— if a source Observable emits an error, pass that error to another Observable to determine whether to resubscribe to the source
例如,我尝试了 retry()
,但它在错误发生后无限期挂起我的进程。
我也试过 onErrorResumeNext()
但它没有按预期工作:
val backupSubject = PublishSubject.create<String>()
val subject = PublishSubject.create<String>()
var currentSubject = subject
subject.onErrorResumeNext(backupSubject).subscribe({
println("next: $it")
}, {
println("error")
currentSubject = backupSubject
}, {
println("complete")
})
backupSubject.subscribe({
println("backup")
}, {
println("backup error")
})
currentSubject.onNext("foo")
currentSubject.onNext("bar")
currentSubject.onError(RuntimeException())
currentSubject.onNext("wom")
currentSubject.onComplete()
这只会打印 foo
和 bar
。
如果您想在出错后继续处理,这意味着您的错误是一个与您的 String
一样的值,应该经过 onNext
。在这种情况下,为了确保类型安全,您应该使用某种形式的包装器,它可以采用常规值或错误;例如,io.reactivex.Notification<T>
在 RxJava 2 中可用:
PublishSubject<Notification<String>> subject = PublishSubject.create();
subject.subscribe(System.out::println);
subject.onNext(Notification.createOnNext("Hello"));
subject.onNext(Notification.<String>createOnError(new RuntimeException("oops")));
subject.onNext(Notification.createOnNext("World"));