RxJava return 错误为 onNext 并继续流
RxJava return error as onNext and continue stream
所以我尝试使用 onErrorReturn
来 return 我想要的结果,但它会在之后完成流,我如何捕获错误 return as Next 并继续流?
使用下面的代码,出现错误时它不会到达 retryWhen
,如果我翻转它,如果出现错误
,它不会重新订阅 retryWhen
fun process(): Observable<State> {
return publishSubject
.flatMap { intent ->
actAsRepo(intent) // Might return error
.map { State(data = it, error = null) }
}
.onErrorReturn { State(data = "", error = it) } // catch the error
.retryWhen { errorObs ->
errorObs.flatMap {
Observable.just(State.defaultState()) // continue subscribing
}
}
}
private fun actAsRepo(string: String): Observable<String> {
if (string.contains('A')) {
throw IllegalArgumentException("Contains A")
} else {
return Observable.just("Wrapped from repo: $string")
}
}
订阅者将是
viewModel.process().subscribe(this::render)
onError 是终端操作员。如果发生 onError,它将在操作员之间传递。您可以使用 onError 运算符来捕获 onError 并提供回退。
在您的示例中,onError 发生在 flatMap 的内部流中。 onError 将向下游传播到 onErrorReturn 运算符。如果您查看实现,您会看到将调用 onErrorReturn lambda,结果将在 onComplete
之后通过 onNext 推送到下游
@Override
public void onError(Throwable t) {
T v;
try {
v = valueSupplier.apply(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
downstream.onError(new CompositeException(t, e));
return;
}
if (v == null) {
NullPointerException e = new NullPointerException("The supplied value is null");
e.initCause(t);
downstream.onError(e);
return;
}
downstream.onNext(v); // <--------
downstream.onComplete(); // <--------
}
你的解决方案的结果是什么?
您的流完成是因为:#retryWhen JavaDoc
If the upstream to the operator is asynchronous, signalling onNext followed by onComplete immediately may result in the sequence to be completed immediately. Similarly, if this inner {@code ObservableSource} signals {@code onError} or {@code onComplete} while the upstream is active, the sequence is terminated with the same signal immediately.
你应该做什么:
将onErrorReturn 放在flatMap 的地图操作符后面。使用此顺序,当 inner-flatMap 流 onErrors.
时,您的流将无法完成
这是为什么?
当外部(来源:publishSubject)和内部流(订阅)都完成时,flatMap 运算符完成。在这种情况下,外部流 (publishSubject) 发出 onNext,内部流将在通过 onNext 发送 {State(data = "", error = it) } 后完成。因此,流将保持打开状态。
interface ApiCall {
fun call(s: String): Observable<String>
}
class ApiCallImpl : ApiCall {
override fun call(s: String): Observable<String> {
// important: warp call into observable, that the exception is caught and emitted as onError downstream
return Observable.fromCallable {
if (s.contains('A')) {
throw IllegalArgumentException("Contains A")
} else {
s
}
}
}
}
data class State(val data: String, val err: Throwable? = null)
apiCallImpl.call 将 return 一个懒惰的可观察对象,它会在订阅时抛出错误,而不是在可观察的组装时间。
// no need for retryWhen here, except you want to catch onComplete from the publishSubject, but once the publishSubject completes no re-subscription will help you, because the publish-subject is terminated and onNext invocations will not be accepted anymore (see implementation).
fun process(): Observable<State> {
return publishSubject
.flatMap { intent ->
apiCallImpl.call(intent) // Might return error
.map { State(data = it, err = null) }
.onErrorReturn { State("", err = it) }
}
}
测试
lateinit var publishSubject: PublishSubject<String>
lateinit var apiCallImpl: ApiCallImpl
@Before
fun init() {
publishSubject = PublishSubject.create()
apiCallImpl = ApiCallImpl()
}
@Test
fun myTest() {
val test = process().test()
publishSubject.onNext("test")
publishSubject.onNext("A")
publishSubject.onNext("test2")
test.assertNotComplete()
.assertNoErrors()
.assertValueCount(3)
.assertValueAt(0) {
assertThat(it).isEqualTo(State("test", null))
true
}
.assertValueAt(1) {
assertThat(it.data).isEmpty()
assertThat(it.err).isExactlyInstanceOf(IllegalArgumentException::class.java)
true
}
.assertValueAt(2) {
assertThat(it).isEqualTo(State("test2", null))
true
}
}
备选
此替代方案的行为与第一个解决方案略有不同。 flatMap-Operator 采用布尔值 (delayError),这将导致吞下 onError 消息,直到源完成。当源完成时,将发出错误。
您可以使用 delayError true,当异常没有用并且不能在出现时记录下来
进程
fun process(): Observable<State> {
return publishSubject
.flatMap({ intent ->
apiCallImpl.call(intent)
.map { State(data = it, err = null) }
}, true)
}
测试
只发出了两个值。错误不会转换为回退值。
@Test
fun myTest() {
val test = process().test()
publishSubject.onNext("test")
publishSubject.onNext("A")
publishSubject.onNext("test2")
test.assertNotComplete()
.assertNoErrors()
.assertValueAt(0) {
assertThat(it).isEqualTo(State("test", null))
true
}
.assertValueAt(1) {
assertThat(it).isEqualTo(State("test2", null))
true
}
.assertValueCount(2)
}
注意:我想你想在这种情况下使用 switchMap,而不是 flatMap。
所以我尝试使用 onErrorReturn
来 return 我想要的结果,但它会在之后完成流,我如何捕获错误 return as Next 并继续流?
使用下面的代码,出现错误时它不会到达 retryWhen
,如果我翻转它,如果出现错误
retryWhen
fun process(): Observable<State> {
return publishSubject
.flatMap { intent ->
actAsRepo(intent) // Might return error
.map { State(data = it, error = null) }
}
.onErrorReturn { State(data = "", error = it) } // catch the error
.retryWhen { errorObs ->
errorObs.flatMap {
Observable.just(State.defaultState()) // continue subscribing
}
}
}
private fun actAsRepo(string: String): Observable<String> {
if (string.contains('A')) {
throw IllegalArgumentException("Contains A")
} else {
return Observable.just("Wrapped from repo: $string")
}
}
订阅者将是
viewModel.process().subscribe(this::render)
onError 是终端操作员。如果发生 onError,它将在操作员之间传递。您可以使用 onError 运算符来捕获 onError 并提供回退。
在您的示例中,onError 发生在 flatMap 的内部流中。 onError 将向下游传播到 onErrorReturn 运算符。如果您查看实现,您会看到将调用 onErrorReturn lambda,结果将在 onComplete
之后通过 onNext 推送到下游 @Override
public void onError(Throwable t) {
T v;
try {
v = valueSupplier.apply(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
downstream.onError(new CompositeException(t, e));
return;
}
if (v == null) {
NullPointerException e = new NullPointerException("The supplied value is null");
e.initCause(t);
downstream.onError(e);
return;
}
downstream.onNext(v); // <--------
downstream.onComplete(); // <--------
}
你的解决方案的结果是什么?
您的流完成是因为:#retryWhen JavaDoc
If the upstream to the operator is asynchronous, signalling onNext followed by onComplete immediately may result in the sequence to be completed immediately. Similarly, if this inner {@code ObservableSource} signals {@code onError} or {@code onComplete} while the upstream is active, the sequence is terminated with the same signal immediately.
你应该做什么:
将onErrorReturn 放在flatMap 的地图操作符后面。使用此顺序,当 inner-flatMap 流 onErrors.
时,您的流将无法完成这是为什么?
当外部(来源:publishSubject)和内部流(订阅)都完成时,flatMap 运算符完成。在这种情况下,外部流 (publishSubject) 发出 onNext,内部流将在通过 onNext 发送 {State(data = "", error = it) } 后完成。因此,流将保持打开状态。
interface ApiCall {
fun call(s: String): Observable<String>
}
class ApiCallImpl : ApiCall {
override fun call(s: String): Observable<String> {
// important: warp call into observable, that the exception is caught and emitted as onError downstream
return Observable.fromCallable {
if (s.contains('A')) {
throw IllegalArgumentException("Contains A")
} else {
s
}
}
}
}
data class State(val data: String, val err: Throwable? = null)
apiCallImpl.call 将 return 一个懒惰的可观察对象,它会在订阅时抛出错误,而不是在可观察的组装时间。
// no need for retryWhen here, except you want to catch onComplete from the publishSubject, but once the publishSubject completes no re-subscription will help you, because the publish-subject is terminated and onNext invocations will not be accepted anymore (see implementation).
fun process(): Observable<State> {
return publishSubject
.flatMap { intent ->
apiCallImpl.call(intent) // Might return error
.map { State(data = it, err = null) }
.onErrorReturn { State("", err = it) }
}
}
测试
lateinit var publishSubject: PublishSubject<String>
lateinit var apiCallImpl: ApiCallImpl
@Before
fun init() {
publishSubject = PublishSubject.create()
apiCallImpl = ApiCallImpl()
}
@Test
fun myTest() {
val test = process().test()
publishSubject.onNext("test")
publishSubject.onNext("A")
publishSubject.onNext("test2")
test.assertNotComplete()
.assertNoErrors()
.assertValueCount(3)
.assertValueAt(0) {
assertThat(it).isEqualTo(State("test", null))
true
}
.assertValueAt(1) {
assertThat(it.data).isEmpty()
assertThat(it.err).isExactlyInstanceOf(IllegalArgumentException::class.java)
true
}
.assertValueAt(2) {
assertThat(it).isEqualTo(State("test2", null))
true
}
}
备选
此替代方案的行为与第一个解决方案略有不同。 flatMap-Operator 采用布尔值 (delayError),这将导致吞下 onError 消息,直到源完成。当源完成时,将发出错误。
您可以使用 delayError true,当异常没有用并且不能在出现时记录下来
进程
fun process(): Observable<State> {
return publishSubject
.flatMap({ intent ->
apiCallImpl.call(intent)
.map { State(data = it, err = null) }
}, true)
}
测试
只发出了两个值。错误不会转换为回退值。
@Test
fun myTest() {
val test = process().test()
publishSubject.onNext("test")
publishSubject.onNext("A")
publishSubject.onNext("test2")
test.assertNotComplete()
.assertNoErrors()
.assertValueAt(0) {
assertThat(it).isEqualTo(State("test", null))
true
}
.assertValueAt(1) {
assertThat(it).isEqualTo(State("test2", null))
true
}
.assertValueCount(2)
}
注意:我想你想在这种情况下使用 switchMap,而不是 flatMap。