从 Futures/Iterables 测试 rx-observables

Testing rx-observables from Futures/Iterables

我有:

val observable: Observable[Int] = Observable.from(List(5))

我可以通过测试来测试输入列表是否确实传递给了可观察对象:

materializeValues(observable) should contain (5)

其中 materializeValues 是:

def materializeValues[T](observable: Observable[T]): List[T] = {
  observable.toBlocking.toIterable.toList
}

现在,如果我从未来创建一个可观察对象,我似乎无法使用 materializeValues 进行测试,因为测试超时。所以如果我有:

val futVal = Future.successful(5)
val observable: Observable[Int] = Observable.from(futVal)
materializeValues(observable) should contain(5)

超时,未通过测试。这两个observable具体化的过程有什么不同,导致我无法阻塞?

此外,测试可观察对象的惯用方法是什么?有什么办法可以不调用 toBlocking?

我认为问题在于您使用了 AsyncWordSpecLike(顺便问一下,为什么使用 AsyncWordSpecLike 而不是 AsyncWordSpec?)。 AsyncWordSpecLike/AsyncWordSpec 旨在简化测试 Future。不幸的是 Observable 是一个更强大的抽象,不能轻易映射到 Future.

特别是 AsyncWordSpecLike/AsyncWordSpec 让您的测试 return Future[Assertion]。为了使其成为可能,它提供了自定义隐式 ExecutionContext ,它可以强制执行所有内容并知道所有计划的作业何时完成。但是,相同的自定义 ExecutionContext 是您的第二个代码不起作用的原因:计划作业的处理仅在您的测试代码执行完成后开始,但您的代码在 futVal 上阻塞,因为不幸的是您在 Future.onComplete 中注册的回调计划在 ExecutionContext 上 运行。这意味着你有一种与你自己的线程的死锁。

我不确定在 Scala 上测试 Observable 的官方方法是什么。在 Java 中,我认为 TestSubscriber 是建议的工具。正如我所说 Observable 从根本上说比 Future 更强大,所以我认为要测试 Observable 你应该避免使用 AsyncWordSpecLike/AsyncWordSpec。如果你切换到使用 FlatSpecWordSpec,你可以这样做:

class MyObservableTestSpec extends WordSpec with Matchers {

  import scala.concurrent.ExecutionContext.Implicits.global
  val testValue = 5

  "observables" should {

    "be testable if created from futures" in {
      val futVal = Future.successful(testValue)
      val observable = Observable.from(futVal)

      val subscriber = TestSubscriber[Int]()
      observable(subscriber)
      subscriber.awaitTerminalEvent
      // now after awaitTerminalEvent you can use various subscriber.assertXyz methods
      subscriber.assertNoErrors
      subscriber.assertValues(testValue)
      // or you can use Matchers as 
      subscriber.getOnNextEvents should contain(testValue)
    }

  }

}