在为 RXJS observables 编写测试时,如何避免通过我的业务逻辑传递调度程序?

How can I avoid passing the scheduler through my business logic, when writing tests for RXJS observables?

我发现让某些测试通过的唯一方法是明确地将调度程序传递给函数。为了便于说明,考虑这个函数:

function doStuff( stream ){
    return stream.delay(100)
        .filter( x => x % 2 === 0 )
        .map( x => x * 2 )
        .flatMapLatest( x=> Rx.Observable.range( x, x+100) )

还有一个测试:

it('example test', () => {
    let scheduler = new Rx.TestScheduler()
    let xs = scheduler.createHotObservable(
        onNext(210, 1),
        onNext(220, 2),
        onNext(230, 3)
    )

    let res = scheduler.startScheduler(
        () => doStuff( xs, scheduler ),
        {created:0, subscribed:200, disposed:1000})

    expect( res.messages ).toEqual( [
        onNext(321, 4),
        onNext(322, 5),
        onNext(323, 6)
    ] )
})

报错:

    Expected [  ] to equal [ ({ time: 321, value: OnNextNotification({ value: 4, kind: 'N' }), comparer: Function }), ({ time: 322, value: OnNextNotification({ value: 5, kind: 'N' }), comparer: Function }), ({ time: 323, value: OnNextNotification({ value: 6, kind: 'N' }), comparer: Function }) ].

这表明 delay 发生在 真实 时间而不是 TestScheduler 的模拟时间。

如果我将调度程序传递给每个操作员,那么我可以让它工作:

function doStuff( stream, scheduler ){
   return stream.delay( 100, scheduler )
      .filter( x => x % 2 === 0 )
      .map( x => x * 2 )
      .flatMapLatest( x => Rx.Observable.range(x, 3, scheduler ) )
}

但我觉得我应该能够设置一次调度程序,而不必让我的实际生产代码将其线程化。我真的很期待,假设原始流是从 TestScheduler 创建的,然后 运行 通过相同的调度程序,这一切都会自动连接起来。

RX 指南建议 consider passing a specific scheduler to concurrency introducing operators。对于单线程Javascript,没有并发,但是像delay()这样的基于时间的运算符也有类似的问题。

这并没有我最初想象的那么糟糕,因为大多数运算符都没有调度程序参数,而且其中只有一部分是基于时间的。这突出了为什么你会明确地传递一个调度程序。在我上面的示例中,我将调度程序传递给支持它的每个操作员,但结果并不完全符合我的预期 - 我什至调整了我的 "expected" 结果以使其工作:

expect( res.messages ).toEqual( [
    onNext(321, 4),
    onNext(322, 5),
    onNext(323, 6)
] )

但实际上,我希望所有这些时间都是 320

对于 delay(),我需要从测试继承调度程序,但是对于 range(),我想要默认调度程序,因为它会立即安排事件。

我的最终示例代码片段将如下所示:

function doStuff( stream, scheduler ){
    return stream.delay( 100, scheduler )
        .filter( x => x % 2 === 0 )
        .map( x => x * 2 )
        .flatMapLatest( x => Rx.Observable.range(x, 3))
}

创建测试时,您总是希望将 TestScheduler 用于基于时间的运算符。其他操作员可能希望使用 DefaultScheduler,而不管测试是否为 运行.