清除订阅时处理 Observable.fromCallable() 内的异常

Handling exceptions inside Observable.fromCallable() when subscription gets cleared

我的情况是 运行 长进程被包裹在 Observable.fromCallable() 中。此过程是一个 OkHttp 调用,如果终止,将抛出一个 IOException。如果订阅了可观察对象,则一次性存储在 CompositeDisposable 中,并按预期处理异常。但是,我的代码在某些情况下会清除 CompositeDisposable,触发 OkHttp 线程终止而没有错误处理,导致应用程序因未处理的异常而崩溃。这是这个问题的一个简单的单元测试示例:

@Test
public void test(){
    CompositeDisposable compositeDisposable = new CompositeDisposable();
    Observable<Object> o = Observable.fromCallable(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            System.out.println("sleeping - this sleep will be interrupted when compositeDisposable gets cleared");
            Thread.sleep(3000);
            return null;
        }
    });
    compositeDisposable.add(o.subscribeOn(new IoScheduler()).subscribe());
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    compositeDisposable.clear();
}

有什么办法可以解决这个问题吗?

不像RxJava1,RxJava2不会把这个Exception传递给Subscriber onError(),因为你调用cancel()取消订阅并且不想再收到通知,所以这种Exceptions取消订阅代码发生的情况现在默认为 Thread.currentThread().getUncaughtExceptionHandler().uncaughtException().

您可以使用 try catch 来包装取消时可能发生的这种异常,或者使用以下方法覆盖默认行为:

RxJavaPlugins.setErrorHandler(Functions.<Throwable>emptyConsumer()); 

或您想要的任何其他处理方式。

您还应该阅读 full explanation by akarnokd at RxJava github。
另请参阅此 discussion 以获取上述解决方案。