如何在没有 InterruptedException 的情况下处理 RxJava 中的处置
How to handle dispose in RxJava without InterruptedException
在下面的代码中,当 dispose()
被调用时,发射器线程被中断(InterruptedException
被从睡眠方法中抛出)。
Observable<Integer> obs = Observable.create(emitter -> {
for (int i = 0; i < 10; i++) {
if (emitter.isDisposed()) {
System.out.println("> exiting.");
emitter.onComplete();
return;
}
emitter.onNext(i);
System.out.println("> calculation = " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
emitter.onComplete();
});
Disposable disposable = obs
.subscribeOn(Schedulers.computation())
.subscribe(System.out::println);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
disposable.dispose();
从调试会话中我看到中断源自 FutureTask
,它在处理期间被取消。在那里,正在调用 dispose()
的线程与运行线程进行检查,如果不匹配,发射器将被中断。线程不同,因为我使用了计算 Scheduler
.
有没有什么方法可以让 dispose 不中断这样的发射器,或者实际上应该如何处理?我用这种方法看到的一个问题是,当我有一个可中断的操作(在这里通过睡眠模拟)时,我想在调用 onComplete()
.
之前正常完成
请参考What's different in 2.0 - Error handling。
One important design requirement for 2.x is that no Throwable errors should be swallowed. This means errors that can't be emitted because the downstream's lifecycle already reached its terminal state or the downstream cancelled a sequence which was about to emit an error.
因此您可以将所有内容都包装在 try/catch 中并正确发出错误:
Observable<Integer> obs = Observable.create(emitter -> {
try {
// ...
} catch (InterruptedException ex) {
// check if the interrupt is due to cancellation
// if so, no need to signal the InterruptedException
if (!disposable.isDisposed()) {
observer.onError(ex);
}
}
});
或者设置一个全局错误消费者来忽略它:
RxJavaPlugins.setErrorHandler(e -> {
// ..
if (e instanceof InterruptedException) {
// fine, some blocking code was interrupted by a dispose call
return;
}
// ...
Log.warning("Undeliverable exception received, not sure what to do", e);
});
在下面的代码中,当 dispose()
被调用时,发射器线程被中断(InterruptedException
被从睡眠方法中抛出)。
Observable<Integer> obs = Observable.create(emitter -> {
for (int i = 0; i < 10; i++) {
if (emitter.isDisposed()) {
System.out.println("> exiting.");
emitter.onComplete();
return;
}
emitter.onNext(i);
System.out.println("> calculation = " + i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
emitter.onComplete();
});
Disposable disposable = obs
.subscribeOn(Schedulers.computation())
.subscribe(System.out::println);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
disposable.dispose();
从调试会话中我看到中断源自 FutureTask
,它在处理期间被取消。在那里,正在调用 dispose()
的线程与运行线程进行检查,如果不匹配,发射器将被中断。线程不同,因为我使用了计算 Scheduler
.
有没有什么方法可以让 dispose 不中断这样的发射器,或者实际上应该如何处理?我用这种方法看到的一个问题是,当我有一个可中断的操作(在这里通过睡眠模拟)时,我想在调用 onComplete()
.
请参考What's different in 2.0 - Error handling。
One important design requirement for 2.x is that no Throwable errors should be swallowed. This means errors that can't be emitted because the downstream's lifecycle already reached its terminal state or the downstream cancelled a sequence which was about to emit an error.
因此您可以将所有内容都包装在 try/catch 中并正确发出错误:
Observable<Integer> obs = Observable.create(emitter -> {
try {
// ...
} catch (InterruptedException ex) {
// check if the interrupt is due to cancellation
// if so, no need to signal the InterruptedException
if (!disposable.isDisposed()) {
observer.onError(ex);
}
}
});
或者设置一个全局错误消费者来忽略它:
RxJavaPlugins.setErrorHandler(e -> {
// ..
if (e instanceof InterruptedException) {
// fine, some blocking code was interrupted by a dispose call
return;
}
// ...
Log.warning("Undeliverable exception received, not sure what to do", e);
});