Observable 的 doOnError 正确位置
Observable's doOnError correct location
我是 Observers
的新手,我仍在努力弄清楚它们。我有以下代码:
observableKafka.getRealTimeEvents()
.filter(this::isTrackedAccount)
.filter(e -> LedgerMapper.isDepositOrClosedTrade((Transaction) e.getPayload()))
.map(ledgerMapper::mapLedgerTransaction)
.map(offerCache::addTransaction)
.filter(offer -> offer != null) // Offer may have been removed from cache since last check
.filter(Offer::isReady)
.doOnError(throwable -> {
LOG.info("Exception thrown on realtime events");
})
.forEach(awardChecker::awardFailOrIgnore);
getRealTimeEvents()
returns 一个 Observable<Event>
.
.doOnError
的位置重要吗?另外,在这段代码中添加多个调用它的效果是什么?我意识到我可以做到,所有这些都被调用了,但我不确定它的目的是什么。
doOn???
方法是为了副作用而存在的,比方说,处理并不是您的核心业务价值。日志记录是一个非常好的用途。
也就是说,有时您想对错误做一些更有意义的事情,例如重试或向用户显示消息等...对于这些情况,"rx" 方法是在 subscribe
呼叫.
doOnError
(以及其他 doOn
方法)将原来的 Observable
包装成一个新的并为其添加行为(显然围绕其 onError
方法) .这就是为什么你可以多次调用它的原因。能够在链中的任何位置调用它的另一个好处是,您可以访问本来对流的使用者(Subscriber
)隐藏的错误,例如,因为链中有重试。 ..
是的,确实如此。 doOnError
在特定点通过流时发生错误,因此如果 doOnError
之前的运算符抛出,您的操作将被调用。但是,如果您将 doOnError
放在更靠上的位置,它可能会或可能不会被调用,具体取决于链中的下游运算符。
给定
Observer<Object> ignore = new Observer<Object>() {
@Override public void onCompleted() {
}
@Override public void onError(Throwable e) {
}
@Override public void onNext(Object t) {
}
};
例如,以下代码将始终调用 doOnError:
Observable.<Object>error(new Exception()).doOnError(e -> log(e)).subscribe(ignore);
然而,这段代码不会:
Observable.just(1).doOnError(e -> log(e))
.flatMap(v -> Observable.<Integer>error(new Exception())).subscribe(ignore);
大多数运营商会反弹源自下游的异常。
如果您通过 onErrorResumeNext
或 onExceptionResumeNext
:
转换异常,则添加 multipe doOnError
是可行的
Observable.<Object>error(new RuntimeException())
.doOnError(e -> log(e))
.onErrorResumeNext(Observable.<Object>error(new IllegalStateException()))
.doOnError(e -> log(e)).subscribe(ignore);
否则,您将在链的多个位置记录相同的异常。
我是 Observers
的新手,我仍在努力弄清楚它们。我有以下代码:
observableKafka.getRealTimeEvents()
.filter(this::isTrackedAccount)
.filter(e -> LedgerMapper.isDepositOrClosedTrade((Transaction) e.getPayload()))
.map(ledgerMapper::mapLedgerTransaction)
.map(offerCache::addTransaction)
.filter(offer -> offer != null) // Offer may have been removed from cache since last check
.filter(Offer::isReady)
.doOnError(throwable -> {
LOG.info("Exception thrown on realtime events");
})
.forEach(awardChecker::awardFailOrIgnore);
getRealTimeEvents()
returns 一个 Observable<Event>
.
.doOnError
的位置重要吗?另外,在这段代码中添加多个调用它的效果是什么?我意识到我可以做到,所有这些都被调用了,但我不确定它的目的是什么。
doOn???
方法是为了副作用而存在的,比方说,处理并不是您的核心业务价值。日志记录是一个非常好的用途。
也就是说,有时您想对错误做一些更有意义的事情,例如重试或向用户显示消息等...对于这些情况,"rx" 方法是在 subscribe
呼叫.
doOnError
(以及其他 doOn
方法)将原来的 Observable
包装成一个新的并为其添加行为(显然围绕其 onError
方法) .这就是为什么你可以多次调用它的原因。能够在链中的任何位置调用它的另一个好处是,您可以访问本来对流的使用者(Subscriber
)隐藏的错误,例如,因为链中有重试。 ..
是的,确实如此。 doOnError
在特定点通过流时发生错误,因此如果 doOnError
之前的运算符抛出,您的操作将被调用。但是,如果您将 doOnError
放在更靠上的位置,它可能会或可能不会被调用,具体取决于链中的下游运算符。
给定
Observer<Object> ignore = new Observer<Object>() {
@Override public void onCompleted() {
}
@Override public void onError(Throwable e) {
}
@Override public void onNext(Object t) {
}
};
例如,以下代码将始终调用 doOnError:
Observable.<Object>error(new Exception()).doOnError(e -> log(e)).subscribe(ignore);
然而,这段代码不会:
Observable.just(1).doOnError(e -> log(e))
.flatMap(v -> Observable.<Integer>error(new Exception())).subscribe(ignore);
大多数运营商会反弹源自下游的异常。
如果您通过 onErrorResumeNext
或 onExceptionResumeNext
:
doOnError
是可行的
Observable.<Object>error(new RuntimeException())
.doOnError(e -> log(e))
.onErrorResumeNext(Observable.<Object>error(new IllegalStateException()))
.doOnError(e -> log(e)).subscribe(ignore);
否则,您将在链的多个位置记录相同的异常。