无法从反应式方法传播错误

Unable to propagate error from a reactive method

我有以下方法,在其中一个步骤中,我调用了 throwACheckedExceptionMethod() 方法。

该方法有机会抛出已检查的异常。现在我不想在这里处理它。我想要订阅它的方法 处理它。

我该怎么做?因为现在这不能编译,因为我没有在这里处理错误。 添加 doOnError() 或在方法级别抛出错误也无济于事。我似乎在这里遗漏了一些关于 RX Java 的东西。

我能得到一些帮助吗?谢谢。

private Observable<Void> myMethod(Observable<KafkaConsumerRecord<String, String>> records) throws Exception /*Doesn't make a diff*/ {
    return client.rxGetConnection()
            .flatMapCompletable(connection -> connection.rxSetAutoCommit(false)
                    .toCompletable()
                    .andThen(records.flatMapSingle(record -> throwACheckedExceptionMethod(connection, record)).toCompletable()) // line in question
                    .andThen(connection.rxCommit().toCompletable())
                    .andThen(connection.rxSetAutoCommit(true).toCompletable())
            )
            .toObservable();
}

private Single<UpdateResult> throwACheckedExceptionMethod(SQLConnection connection, KafkaConsumerRecord<String, String> record) throws Exception {
    // some operation which may return a Single. 
    throw new Exception();
}

以下是一个可能的选项,但我不想像上面提到的那样处理这里的错误。

private Observable<Void> processTransaction(Observable<KafkaConsumerRecord<String, String>> records) {

    return client.rxGetConnection()
            .flatMapCompletable(connection -> connection.rxSetAutoCommit(false)
                    .toCompletable()
                    .andThen(records.flatMapSingle(record -> {
                        try {
                            return throwACheckedExceptionMethod(connection, record);
                        } catch (Exception throwables) {
                            throwables.printStackTrace();
                        }
                    })
                            .toCompletable())
                    .andThen(connection.rxCommit().toCompletable())
                    .andThen(connection.rxSetAutoCommit(true).toCompletable())
            )
            .toObservable();
}

throwACheckedExceptionMethod 方法不应抛出异常,因为抛出异常会破坏可观察链。

相反,它应该 return 一个 发出 错误的可观察对象,Single.error:

private Single<UpdateResult> throwACheckedExceptionMethod(SQLConnection connection, KafkaConsumerRecord<String, String> record) throws Exception {
    // some operation which may return a UpdateResult 
    return Single.error(new Exception());
}

现在异常被传播到订阅者的 onError 方法。