如何在 RxJava2 中静默跳过异常?
How to skip exceptions silently in RxJava2?
我有这样的数据流:
Observable
.fromFuture(
CompletableFuture.supplyAsync { // First remote call returns Future<List<Type>>
listOf(1, 2, 3, 57005, 5)
},
Schedulers.computation()
)
.flatMap { it.toObservable() } // I turn that list into a stream of single values to process them one by one
.map {
CompletableFuture.supplyAsync { // This remote call may fail if it does not like the input. I want to skip that failures and continue the stream like the fail never occurred.
if (it == 0xDEAD) {
throw IOException("Dead value!")
}
it
}
}
.flatMap {
Observable.fromFuture(it) // Turn that Futures into a stream of Observables once again
}
.doOnNext {
println(it) // Debug
}
.blockingSubscribe()
我已经用 CompletableFuture.supplyAsync
替换了业务逻辑(实际上是 return Future
s)。
而且,是的,这是 Kotlin,但我猜你明白了。
当我评论 "dead" 值(57005
、0xDEAD
)时,输出是:
1
4
9
25
但如果 "dead" 值出现在流中,则失败:
1
4
9
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.io.IOException: Dead value!
at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:45)
at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:86)
at io.reactivex.Observable.blockingSubscribe(Observable.java:5035)
at by.dev.madhead.rx.TestKt.main(test.kt:41)
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Dead value!
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
...
我是 RX 的新手,所以很快就用谷歌搜索了解决方案:onExceptionResumeNext
:Observable.fromFuture(it)
--> Observable.fromFuture(it).onExceptionResumeNext { Observable.empty<Int>() }
。但是现在我的应用程序永远挂起(在产生我期望的输出之后)。
看起来流永远不会结束。
我应该 "shutdown" 那 Observable
不知何故还是什么?
或者,更一般地说,它是使用 RX 的好方法吗?
我应该换个方式重新考虑吗?
像这样吞下异常:
Observable.fromFuture(it).onErrorResumeNext(Observable.empty())
我有这样的数据流:
Observable
.fromFuture(
CompletableFuture.supplyAsync { // First remote call returns Future<List<Type>>
listOf(1, 2, 3, 57005, 5)
},
Schedulers.computation()
)
.flatMap { it.toObservable() } // I turn that list into a stream of single values to process them one by one
.map {
CompletableFuture.supplyAsync { // This remote call may fail if it does not like the input. I want to skip that failures and continue the stream like the fail never occurred.
if (it == 0xDEAD) {
throw IOException("Dead value!")
}
it
}
}
.flatMap {
Observable.fromFuture(it) // Turn that Futures into a stream of Observables once again
}
.doOnNext {
println(it) // Debug
}
.blockingSubscribe()
我已经用 CompletableFuture.supplyAsync
替换了业务逻辑(实际上是 return Future
s)。
而且,是的,这是 Kotlin,但我猜你明白了。
当我评论 "dead" 值(57005
、0xDEAD
)时,输出是:
1
4
9
25
但如果 "dead" 值出现在流中,则失败:
1
4
9
Exception in thread "main" java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.io.IOException: Dead value!
at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:45)
at io.reactivex.internal.operators.observable.ObservableBlockingSubscribe.subscribe(ObservableBlockingSubscribe.java:86)
at io.reactivex.Observable.blockingSubscribe(Observable.java:5035)
at by.dev.madhead.rx.TestKt.main(test.kt:41)
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Dead value!
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
...
我是 RX 的新手,所以很快就用谷歌搜索了解决方案:onExceptionResumeNext
:Observable.fromFuture(it)
--> Observable.fromFuture(it).onExceptionResumeNext { Observable.empty<Int>() }
。但是现在我的应用程序永远挂起(在产生我期望的输出之后)。
看起来流永远不会结束。
我应该 "shutdown" 那 Observable
不知何故还是什么?
或者,更一般地说,它是使用 RX 的好方法吗?
我应该换个方式重新考虑吗?
像这样吞下异常:
Observable.fromFuture(it).onErrorResumeNext(Observable.empty())