是否可以在RxAndroid的调用方法中重新抛出错误?

Is it possible to re-throw an error in the calling method in RxAndroid?

受到.Net TPL的启发,我试图找到一种方法来处理 RX 管道外的错误。具体来说,如果出现错误,我希望 Observer 管道停止,并将控制权交还给周围的方法。类似于:

public void testRxJava() {
    try {
        Observable.range(0, 5)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .map(i -> { throw new RuntimeException(); })
            .subscribe();
    } catch (Exception ex) {
        // I was hoping to get here on the main thread, but crashed instead
        Log.i("Test", "Will never get here");
    }
}

这将导致应用程序崩溃并出现 io.reactivex.rxjava3.exceptions.OnErrorNotImplementedException,它不会被 catch 子句捕获,而是调用主线程的 uncaughtException() 处理程序。

尝试从 subscribe() 中的错误处理程序中抛出也不起作用,再次回退到 uncaughtException() 处理程序。

有没有办法重新抛出或以其他方式将错误信息传递给调用方法?

找到了 C# 的类似问题 here

你试过像这样捕获错误吗

    Observable.range(0, 5)
        .subscribeOn(Schedulers.newThread())
        .doOnError {
            //your error caught here
        }         
        .observeOn(AndroidSchedulers.mainThread())
        .map({ i -> throw RuntimeException() })
        .subscribe()

这是我最后做的。据我所知,这是唯一离开 ReactiveX 管道,让周围的代码处理错误的方法。如果有人有更优雅的方式会很高兴:

public void testRxJava() {
    try {
        // will be null if no error, will hold a Throwable on error
        AtomicReference<Throwable> opError = new AtomicReference<>(null);

        Observable.range(0, 5)
            .subscribeOn(Schedulers.newThread())
            .observeOn(AndroidSchedulers.mainThread())
            .map(i -> { throw new RuntimeException(); }) // throws
            .blockingSubscribe(
                result -> Log.i(TAG, "will never happen"),
                error -> { opError.set(error); } // sets reference to the error without crashing the app
            );

        // re-throw
        if(opError.get() != null) {
            throw new Exception(opError.get());
        }

    } catch (Exception ex) {
        Log.e("Test", "exception", ex);
    }
}