可观察的异常处理
Observable Exception handling
我正在学习 RxScala 并来到这个非常合成的片段。我正在尝试处理 onError 块中的异常:
def doLongOperation():String = {
(1 to 10).foreach {
_ =>
Thread.sleep(100)
print(".")
}
println()
if (System.currentTimeMillis() % 2 == 0) {
throw new RuntimeException("Something went wrong during long operation")
}
s"OK"
}
def main(args: Array[String]) {
println("Changing status: -> doing task1")
Observable.just(
doLongOperation()
).subscribe(
str => println("Changing status: doing task1 -> doing task2"),
throwable => println(s"Failed: ${throwable.getMessage}"), //never get here
() => println("Completed part")
)
}
如果出现异常,我希望是这样的:
Failed: Something went wrong during long operation
但我得到的是:
.........Exception in thread "main" java.lang.RuntimeException: Something went wrong during long operation
at stats.STest$.doLongOperation(STest.scala:20)
at stats.STest$.main(STest.scala:49)
at stats.STest.main(STest.scala)
我错过了什么?我应该 'manually' 在观察者处调用 onError 吗?感谢您的帮助。
Observable.just
没有很好地处理异常情况,不确定这是错误还是预期行为。 不过你可以这样试试:
Observable.create[String]( o => {
o.onNext(doLongOperation())
o.onCompleted()
Subscription{}
}).subscribe(
str => println("Changing status: doing task1 -> doing task2"),
throwable => println(s"Failed: ${throwable.getMessage}"), here
() => println("Completed part")
)
问题是对 just() 的误解。它在序列组装时采用现有值,而不是在订阅者订阅时执行的方法。换句话说,您的代码是这样做的:
var tempValue = doLongOperation();
Observable.just(tempValue).subscribe(...)
甚至在创建 Observable 之前就抛出。
(抱歉,我不太了解 Scala 或 RxScala,请原谅我的 Java 8 个示例。)
我不知道 RxScala 比 RxJava 落后多少,但是 RxJava 1.0.15 有一个新的工厂方法,fromCallable
让你延迟一个单一的值:
Observable.fromCallable(() -> doLongOperation()).subscribe(...)
另一种方法是将您的原始来源包装到 defer
中,这样当 doLongOperation
抛出时,它会被路由到订阅者:
Observable.defer(() -> Observable.just(doLongOperation())).subscribe(...)
我正在学习 RxScala 并来到这个非常合成的片段。我正在尝试处理 onError 块中的异常:
def doLongOperation():String = {
(1 to 10).foreach {
_ =>
Thread.sleep(100)
print(".")
}
println()
if (System.currentTimeMillis() % 2 == 0) {
throw new RuntimeException("Something went wrong during long operation")
}
s"OK"
}
def main(args: Array[String]) {
println("Changing status: -> doing task1")
Observable.just(
doLongOperation()
).subscribe(
str => println("Changing status: doing task1 -> doing task2"),
throwable => println(s"Failed: ${throwable.getMessage}"), //never get here
() => println("Completed part")
)
}
如果出现异常,我希望是这样的:
Failed: Something went wrong during long operation
但我得到的是:
.........Exception in thread "main" java.lang.RuntimeException: Something went wrong during long operation
at stats.STest$.doLongOperation(STest.scala:20)
at stats.STest$.main(STest.scala:49)
at stats.STest.main(STest.scala)
我错过了什么?我应该 'manually' 在观察者处调用 onError 吗?感谢您的帮助。
不过你可以这样试试:Observable.just
没有很好地处理异常情况,不确定这是错误还是预期行为。
Observable.create[String]( o => {
o.onNext(doLongOperation())
o.onCompleted()
Subscription{}
}).subscribe(
str => println("Changing status: doing task1 -> doing task2"),
throwable => println(s"Failed: ${throwable.getMessage}"), here
() => println("Completed part")
)
问题是对 just() 的误解。它在序列组装时采用现有值,而不是在订阅者订阅时执行的方法。换句话说,您的代码是这样做的:
var tempValue = doLongOperation();
Observable.just(tempValue).subscribe(...)
甚至在创建 Observable 之前就抛出。
(抱歉,我不太了解 Scala 或 RxScala,请原谅我的 Java 8 个示例。)
我不知道 RxScala 比 RxJava 落后多少,但是 RxJava 1.0.15 有一个新的工厂方法,fromCallable
让你延迟一个单一的值:
Observable.fromCallable(() -> doLongOperation()).subscribe(...)
另一种方法是将您的原始来源包装到 defer
中,这样当 doLongOperation
抛出时,它会被路由到订阅者:
Observable.defer(() -> Observable.just(doLongOperation())).subscribe(...)