RxJava2 observable 抛出 UndeliverableException
RxJava2 observable take throws UndeliverableException
据我了解,RxJava2 values.take(1)
创建了另一个 Observable,它只包含来自原始 Observable 的一个元素。 绝不能 抛出异常,因为它已被 take(1)
的效果过滤掉,因为它是第二次发生的。
如以下代码段
Observable<Integer> values = Observable.create(o -> {
o.onNext(1);
o.onError(new Exception("Oops"));
});
values.take(1)
.subscribe(
System.out::println,
e -> System.out.println("Error: " + e.getMessage()),
() -> System.out.println("Completed")
);
输出
1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
at ch02.lambda$main[=11=](ch02.java:28)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.Observable.subscribe(Observable.java:10827)
at io.reactivex.Observable.subscribe(Observable.java:10787)
at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
at ch02.lambda$main[=11=](ch02.java:28)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.Observable.subscribe(Observable.java:10827)
at io.reactivex.Observable.subscribe(Observable.java:10787)
at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
... 8 more
我的问题:
- 我的理解正确吗?
- 导致异常的真正原因是什么。
- 消费者如何解决这个问题?
- 是的,但是因为可观察的'ends'并不意味着
create(...)
里面的代码运行停止了。为了在这种情况下完全安全,您需要使用 o.isDisposed()
查看 observable 是否已在下游结束。
- 例外是因为 RxJava 2 的策略是永远不允许
onError
调用丢失。如果 observable 已经终止,它要么被传递到下游,要么作为全局 UndeliverableException
抛出。由 Observable 的创建者'properly' 处理 Observable 结束并发生异常的情况。
- 问题是生产者 (
Observable
) 和消费者 (Subscriber
) 对流何时结束意见不一。由于在这种情况下生产者的寿命比消费者长,因此只能在生产者中解决问题。
@Kiskae 在之前的评论中正确回答了出现此类异常的原因。
此处 link 关于此主题的官方文档:RxJava2-wiki。
有时您无法更改此行为,因此有一种方法可以处理此 UndeliverableException
。以下是如何避免崩溃和不当行为的代码片段:
RxJavaPlugins.setErrorHandler(e -> {
if (e instanceof UndeliverableException) {
e = e.getCause();
}
if ((e instanceof IOException) || (e instanceof SocketException)) {
// fine, irrelevant network problem or API that throws on cancellation
return;
}
if (e instanceof InterruptedException) {
// fine, some blocking code was interrupted by a dispose call
return;
}
if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) {
// that's likely a bug in the application
Thread.currentThread().getUncaughtExceptionHandler()
.handleException(Thread.currentThread(), e);
return;
}
if (e instanceof IllegalStateException) {
// that's a bug in RxJava or in a custom operator
Thread.currentThread().getUncaughtExceptionHandler()
.handleException(Thread.currentThread(), e);
return;
}
Log.warning("Undeliverable exception received, not sure what to do", e);
});
此代码取自上面的 link。
重要说明。这种方法将全局错误处理程序设置为 RxJava,所以如果你能摆脱这些异常 - 这将是更好的选择。
在使用 observable.create() 时,只需使用 tryOnError()。 onError() 不保证错误会得到处理。还有各种error handling operators are there HERE
Kotlin
我在 MainActivity 的 onCreate 方法中调用这个
private fun initRxErrorHandler(){
RxJavaPlugins.setErrorHandler { throwable ->
if (throwable is UndeliverableException) {
throwable.cause?.let {
Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), it)
return@setErrorHandler
}
}
if (throwable is IOException || throwable is SocketException) {
// fine, irrelevant network problem or API that throws on cancellation
return@setErrorHandler
}
if (throwable is InterruptedException) {
// fine, some blocking code was interrupted by a dispose call
return@setErrorHandler
}
if (throwable is NullPointerException || throwable is IllegalArgumentException) {
// that's likely a bug in the application
Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), throwable)
return@setErrorHandler
}
if (throwable is IllegalStateException) {
// that's a bug in RxJava or in a custom operator
Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), throwable)
return@setErrorHandler
}
Log.w("Undeliverable exception", throwable)
}
}
据我了解,RxJava2 values.take(1)
创建了另一个 Observable,它只包含来自原始 Observable 的一个元素。 绝不能 抛出异常,因为它已被 take(1)
的效果过滤掉,因为它是第二次发生的。
如以下代码段
Observable<Integer> values = Observable.create(o -> {
o.onNext(1);
o.onError(new Exception("Oops"));
});
values.take(1)
.subscribe(
System.out::println,
e -> System.out.println("Error: " + e.getMessage()),
() -> System.out.println("Completed")
);
输出
1
Completed
io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
at ch02.lambda$main[=11=](ch02.java:28)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.Observable.subscribe(Observable.java:10827)
at io.reactivex.Observable.subscribe(Observable.java:10787)
at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
... 8 more
Exception in thread "main" io.reactivex.exceptions.UndeliverableException: java.lang.Exception: Oops
at io.reactivex.plugins.RxJavaPlugins.onError(RxJavaPlugins.java:366)
at io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter.onError(ObservableCreate.java:83)
at ch02.lambda$main[=11=](ch02.java:28)
at io.reactivex.internal.operators.observable.ObservableCreate.subscribeActual(ObservableCreate.java:40)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.internal.operators.observable.ObservableTake.subscribeActual(ObservableTake.java:30)
at io.reactivex.Observable.subscribe(Observable.java:10841)
at io.reactivex.Observable.subscribe(Observable.java:10827)
at io.reactivex.Observable.subscribe(Observable.java:10787)
at ch02.main(ch02.java:32)
Caused by: java.lang.Exception: Oops
... 8 more
我的问题:
- 我的理解正确吗?
- 导致异常的真正原因是什么。
- 消费者如何解决这个问题?
- 是的,但是因为可观察的'ends'并不意味着
create(...)
里面的代码运行停止了。为了在这种情况下完全安全,您需要使用o.isDisposed()
查看 observable 是否已在下游结束。 - 例外是因为 RxJava 2 的策略是永远不允许
onError
调用丢失。如果 observable 已经终止,它要么被传递到下游,要么作为全局UndeliverableException
抛出。由 Observable 的创建者'properly' 处理 Observable 结束并发生异常的情况。 - 问题是生产者 (
Observable
) 和消费者 (Subscriber
) 对流何时结束意见不一。由于在这种情况下生产者的寿命比消费者长,因此只能在生产者中解决问题。
@Kiskae 在之前的评论中正确回答了出现此类异常的原因。
此处 link 关于此主题的官方文档:RxJava2-wiki。
有时您无法更改此行为,因此有一种方法可以处理此 UndeliverableException
。以下是如何避免崩溃和不当行为的代码片段:
RxJavaPlugins.setErrorHandler(e -> {
if (e instanceof UndeliverableException) {
e = e.getCause();
}
if ((e instanceof IOException) || (e instanceof SocketException)) {
// fine, irrelevant network problem or API that throws on cancellation
return;
}
if (e instanceof InterruptedException) {
// fine, some blocking code was interrupted by a dispose call
return;
}
if ((e instanceof NullPointerException) || (e instanceof IllegalArgumentException)) {
// that's likely a bug in the application
Thread.currentThread().getUncaughtExceptionHandler()
.handleException(Thread.currentThread(), e);
return;
}
if (e instanceof IllegalStateException) {
// that's a bug in RxJava or in a custom operator
Thread.currentThread().getUncaughtExceptionHandler()
.handleException(Thread.currentThread(), e);
return;
}
Log.warning("Undeliverable exception received, not sure what to do", e);
});
此代码取自上面的 link。
重要说明。这种方法将全局错误处理程序设置为 RxJava,所以如果你能摆脱这些异常 - 这将是更好的选择。
在使用 observable.create() 时,只需使用 tryOnError()。 onError() 不保证错误会得到处理。还有各种error handling operators are there HERE
Kotlin
我在 MainActivity 的 onCreate 方法中调用这个
private fun initRxErrorHandler(){
RxJavaPlugins.setErrorHandler { throwable ->
if (throwable is UndeliverableException) {
throwable.cause?.let {
Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), it)
return@setErrorHandler
}
}
if (throwable is IOException || throwable is SocketException) {
// fine, irrelevant network problem or API that throws on cancellation
return@setErrorHandler
}
if (throwable is InterruptedException) {
// fine, some blocking code was interrupted by a dispose call
return@setErrorHandler
}
if (throwable is NullPointerException || throwable is IllegalArgumentException) {
// that's likely a bug in the application
Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), throwable)
return@setErrorHandler
}
if (throwable is IllegalStateException) {
// that's a bug in RxJava or in a custom operator
Thread.currentThread().uncaughtExceptionHandler?.uncaughtException(Thread.currentThread(), throwable)
return@setErrorHandler
}
Log.w("Undeliverable exception", throwable)
}
}