无法从反应式方法传播错误
Unable to propagate error from a reactive method
我有以下方法,在其中一个步骤中,我调用了 throwACheckedExceptionMethod() 方法。
该方法有机会抛出已检查的异常。现在我不想在这里处理它。我想要订阅它的方法
处理它。
我该怎么做?因为现在这不能编译,因为我没有在这里处理错误。
添加 doOnError() 或在方法级别抛出错误也无济于事。我似乎在这里遗漏了一些关于 RX Java 的东西。
我能得到一些帮助吗?谢谢。
private Observable<Void> myMethod(Observable<KafkaConsumerRecord<String, String>> records) throws Exception /*Doesn't make a diff*/ {
return client.rxGetConnection()
.flatMapCompletable(connection -> connection.rxSetAutoCommit(false)
.toCompletable()
.andThen(records.flatMapSingle(record -> throwACheckedExceptionMethod(connection, record)).toCompletable()) // line in question
.andThen(connection.rxCommit().toCompletable())
.andThen(connection.rxSetAutoCommit(true).toCompletable())
)
.toObservable();
}
private Single<UpdateResult> throwACheckedExceptionMethod(SQLConnection connection, KafkaConsumerRecord<String, String> record) throws Exception {
// some operation which may return a Single.
throw new Exception();
}
以下是一个可能的选项,但我不想像上面提到的那样处理这里的错误。
private Observable<Void> processTransaction(Observable<KafkaConsumerRecord<String, String>> records) {
return client.rxGetConnection()
.flatMapCompletable(connection -> connection.rxSetAutoCommit(false)
.toCompletable()
.andThen(records.flatMapSingle(record -> {
try {
return throwACheckedExceptionMethod(connection, record);
} catch (Exception throwables) {
throwables.printStackTrace();
}
})
.toCompletable())
.andThen(connection.rxCommit().toCompletable())
.andThen(connection.rxSetAutoCommit(true).toCompletable())
)
.toObservable();
}
throwACheckedExceptionMethod
方法不应抛出异常,因为抛出异常会破坏可观察链。
相反,它应该 return 一个 发出 错误的可观察对象,Single.error
:
private Single<UpdateResult> throwACheckedExceptionMethod(SQLConnection connection, KafkaConsumerRecord<String, String> record) throws Exception {
// some operation which may return a UpdateResult
return Single.error(new Exception());
}
现在异常被传播到订阅者的 onError
方法。
我有以下方法,在其中一个步骤中,我调用了 throwACheckedExceptionMethod() 方法。
该方法有机会抛出已检查的异常。现在我不想在这里处理它。我想要订阅它的方法 处理它。
我该怎么做?因为现在这不能编译,因为我没有在这里处理错误。 添加 doOnError() 或在方法级别抛出错误也无济于事。我似乎在这里遗漏了一些关于 RX Java 的东西。
我能得到一些帮助吗?谢谢。
private Observable<Void> myMethod(Observable<KafkaConsumerRecord<String, String>> records) throws Exception /*Doesn't make a diff*/ {
return client.rxGetConnection()
.flatMapCompletable(connection -> connection.rxSetAutoCommit(false)
.toCompletable()
.andThen(records.flatMapSingle(record -> throwACheckedExceptionMethod(connection, record)).toCompletable()) // line in question
.andThen(connection.rxCommit().toCompletable())
.andThen(connection.rxSetAutoCommit(true).toCompletable())
)
.toObservable();
}
private Single<UpdateResult> throwACheckedExceptionMethod(SQLConnection connection, KafkaConsumerRecord<String, String> record) throws Exception {
// some operation which may return a Single.
throw new Exception();
}
以下是一个可能的选项,但我不想像上面提到的那样处理这里的错误。
private Observable<Void> processTransaction(Observable<KafkaConsumerRecord<String, String>> records) {
return client.rxGetConnection()
.flatMapCompletable(connection -> connection.rxSetAutoCommit(false)
.toCompletable()
.andThen(records.flatMapSingle(record -> {
try {
return throwACheckedExceptionMethod(connection, record);
} catch (Exception throwables) {
throwables.printStackTrace();
}
})
.toCompletable())
.andThen(connection.rxCommit().toCompletable())
.andThen(connection.rxSetAutoCommit(true).toCompletable())
)
.toObservable();
}
throwACheckedExceptionMethod
方法不应抛出异常,因为抛出异常会破坏可观察链。
相反,它应该 return 一个 发出 错误的可观察对象,Single.error
:
private Single<UpdateResult> throwACheckedExceptionMethod(SQLConnection connection, KafkaConsumerRecord<String, String> record) throws Exception {
// some operation which may return a UpdateResult
return Single.error(new Exception());
}
现在异常被传播到订阅者的 onError
方法。