通过超时终止 Observable

Terminate Observable by timeout

我正在尝试通过超时来限制可观察对象的寿命:

def doLongOperation() = {
   Thread.sleep(duration)
   "OK"
}

def firstStep = Observable.create(
  (observer: Observer[String]) => {
    observer.onNext(doLongOperation())
    observer.onCompleted()
    Subscription()
  }
)

firstStep
  .timeout(1 second)
  .subscribe(
    item => println(item),
    throwable => throw throwable,
    () => println("complete")
  ) 

我想区分以下结果:

  1. Observable 超时结束,未获得结果
  2. 执行期间抛出异常
  3. 执行成功,return值

我可以在部分 onNext 和 onError 中毫无问题地处理案例 2 和案例 3,但是我如何检测 observable 是否已超时完成?

还有一件事:尽管我的代码中调用了 obeserver.onCompleted(),但我从未进入块 onComplete。为什么?

如果发生超时,TimeoutException 会在计算线程上发出,其中 throw throwable 最终会被忽略,您的主线程不会也看不到它。您可以在超时后添加 toBlocking,这样任何异常都将在同一线程上结束:

firstStep
  .timeout(1 second)
  .toBlocking()
  .subscribe(
    item => println(item),
    throwable => println(throwable),
    () => println("complete")

)

确实会抛出 TimeoutException。问题是由于使用了错误的库引起的。我的依赖项中有 "com.netflix.rxjava",而不是 "io.reactivex"