onErrorResumeNext 后间隔继续工作
Interval continue working after onErrorResumeNext
我正在使用 Interval 运算符,即使我的管道发生异常,我也想继续发射项目。
所以我尝试使用 onErrorResumeNext
发出一个项目以防出现异常。但是我看到在发出这个项目后,间隔停止发射更多项目。
这是我的单元测试。
@Test
public void testIntervalObservableWithError() {
Subscription subscription = Observable.interval(50, TimeUnit.MILLISECONDS)
.map(time -> "item\n")
.map(item -> item = null)
.map(String::toString)
.onErrorResumeNext(t-> Observable.just("item with error emitted"))
.subscribe(System.out::print, t->{
System.out.println(t);
}
);
TestSubscriber testSubscriber = new TestSubscriber((Observer) subscription);
testSubscriber.awaitTerminalEvent(20000, TimeUnit.MILLISECONDS);
}
我对这种行为感到困惑,为什么 observable 在收到来自 onErrorResumeNext
的项目时取消订阅
解决方案:
经过一番解释后,我意识到当错误发生时,可观察到的 t 是完整的。所以我最终将可能有异常的可观察对象包装到另一个可观察对象中,并且我使用的是 flatMap。所以主要的 Observable 继续发射项目。
@Test
public void testIntervalObservableWithError() {
Observable.interval(100, TimeUnit.MILLISECONDS)
.map(time -> "item\n")
.flatMap(item -> Observable.just(item)
.map(String::toString))
.subscribe(System.out::print);
TestSubscriber testSubscriber = new TestSubscriber();
testSubscriber.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS);
}
如果有任何运算符可以完成所有这些魔术,我想知道。
注册人数
RxJava 流使用的契约是,一旦发出错误,就不应发出更多项。如果您的用例要求流在错误发生后继续,那么您还需要将错误转换为 onNext
发射。创建一个包装器类型说 ValueOrError<T>
并开始考虑 Observable<ValueOrError<T>>
:
Observable<Integer> source = ...
Observable<ValueOrError<Integer>> o =
source.map(x -> {
try {
return new ValueOrError<>(mightThrow(x));
}
catch (Throwable e) {
return new ValueOrError<>(e);
});
您的订阅中断,因为当 onErrorResumeNext
被触发时,您的上游已经完成并出现错误。你只需发出一个项目而不是让异常进入下游。为了保持上游活动,您必须防止在其上抛出异常。
对于您的特定示例解决方案可以是这样的:
...
.map(time -> "item\n")
.map(item -> item = null)
.map(item -> {
try {
return item.toString();
} catch (NullPointerException e) {
return "item with error emitted";
})
//no onErrorResumeNext()
.subscribe ...
onErrorResumeNext
只是用一个项目替换错误并调用 onComplete
.
这里是一个使用 onErrorResumeNext 继续调用 http 服务出错的工作示例。花了几个小时来寻找它应该如何工作(RxJs 的新手),但最终从@Maksim Ostrovidov 的评论中理解了它,因此将它发布在这里。
您可以在 chrome 中通过在开发工具 "Network Tab"
中切换离线复选框来测试它
var interval = 1000;
timer(0, interval).pipe(
switchMap(_ => this.http.get('api/health').pipe(onErrorResumeNext()))
).subscribe(response => this.savedResponse = response);
我正在使用 Interval 运算符,即使我的管道发生异常,我也想继续发射项目。
所以我尝试使用 onErrorResumeNext
发出一个项目以防出现异常。但是我看到在发出这个项目后,间隔停止发射更多项目。
这是我的单元测试。
@Test
public void testIntervalObservableWithError() {
Subscription subscription = Observable.interval(50, TimeUnit.MILLISECONDS)
.map(time -> "item\n")
.map(item -> item = null)
.map(String::toString)
.onErrorResumeNext(t-> Observable.just("item with error emitted"))
.subscribe(System.out::print, t->{
System.out.println(t);
}
);
TestSubscriber testSubscriber = new TestSubscriber((Observer) subscription);
testSubscriber.awaitTerminalEvent(20000, TimeUnit.MILLISECONDS);
}
我对这种行为感到困惑,为什么 observable 在收到来自 onErrorResumeNext
解决方案:
经过一番解释后,我意识到当错误发生时,可观察到的 t 是完整的。所以我最终将可能有异常的可观察对象包装到另一个可观察对象中,并且我使用的是 flatMap。所以主要的 Observable 继续发射项目。
@Test
public void testIntervalObservableWithError() {
Observable.interval(100, TimeUnit.MILLISECONDS)
.map(time -> "item\n")
.flatMap(item -> Observable.just(item)
.map(String::toString))
.subscribe(System.out::print);
TestSubscriber testSubscriber = new TestSubscriber();
testSubscriber.awaitTerminalEvent(5000, TimeUnit.MILLISECONDS);
}
如果有任何运算符可以完成所有这些魔术,我想知道。
注册人数
RxJava 流使用的契约是,一旦发出错误,就不应发出更多项。如果您的用例要求流在错误发生后继续,那么您还需要将错误转换为 onNext
发射。创建一个包装器类型说 ValueOrError<T>
并开始考虑 Observable<ValueOrError<T>>
:
Observable<Integer> source = ...
Observable<ValueOrError<Integer>> o =
source.map(x -> {
try {
return new ValueOrError<>(mightThrow(x));
}
catch (Throwable e) {
return new ValueOrError<>(e);
});
您的订阅中断,因为当 onErrorResumeNext
被触发时,您的上游已经完成并出现错误。你只需发出一个项目而不是让异常进入下游。为了保持上游活动,您必须防止在其上抛出异常。
对于您的特定示例解决方案可以是这样的:
...
.map(time -> "item\n")
.map(item -> item = null)
.map(item -> {
try {
return item.toString();
} catch (NullPointerException e) {
return "item with error emitted";
})
//no onErrorResumeNext()
.subscribe ...
onErrorResumeNext
只是用一个项目替换错误并调用 onComplete
.
这里是一个使用 onErrorResumeNext 继续调用 http 服务出错的工作示例。花了几个小时来寻找它应该如何工作(RxJs 的新手),但最终从@Maksim Ostrovidov 的评论中理解了它,因此将它发布在这里。 您可以在 chrome 中通过在开发工具 "Network Tab"
中切换离线复选框来测试它var interval = 1000;
timer(0, interval).pipe(
switchMap(_ => this.http.get('api/health').pipe(onErrorResumeNext()))
).subscribe(response => this.savedResponse = response);