RxJs:返回后立即执行一个 long 运行 函数,给调用者一个 Observable 用于更新

RxJs: Execute a long running function immediately after returning, giving the caller an Observable for updates

我有一个函数可以被认为是 long 运行ning(实际上,它是多步骤的,其中每个步骤都可以等待外部事件,例如来自 HTTP 调用的响应)。

有一个调用函数,它应该 return 一个可观察到的 return 对原始函数的更新。原始函数必须 运行 无论 returned 可观察对象是否被订阅。基本上,它需要 return 一个热门的可观察对象。

我尝试了以下方法,但无法正常工作:

function longRunningOperation(): Observable<number> {
  const updates$ = new Subject<number>();
  Promise.resolve().then(() => {
    console.log('starting updates...');
    updates$.next(1);
    updates$.next(2);
    updates$.next(3);
    updates$.complete();
  });
  return updates$;
}

如果我对上面的内容进行弹珠测试,我发现实际生成的事件是空的(尽管函数确实执行了)。

  it('should return and start executing', () => {
    const updates$ = longRunningOperation();
    const marbles = '(abc|)';
    const events = { a: 1, b: 2, c: 3 };

    new TestScheduler((actual, expected) =>
      expect(actual).toEqual(expected)
    ).run(({ expectObservable }) => {
      expectObservable(updates$).toBe(marbles, events);
      console.log('Test Scheduler subscribed');
    });
  });

我做错了什么?

Link 进行演示 https://stackblitz.com/edit/jasmine-in-angular-upoavr?file=src/app/app.component.spec.ts

我不认为你可以用弹珠来测试这个场景,至少以一种简单的方式。

Marbles 是一种同步机制,而Promise总是异步

因此,当您的测试执行 TestSchedulerrun 方法时,它会同步执行。 Promise 虽然稍后将由 JS 引擎解析,因此只有稍后 update$ 主题才会发出其值。

这就是为什么测试说它从 update$ 收到 0 个通知而不是预期的 4 个通知的原因。来自 update$ 的通知将在断言被评估后发出。

如果你想在没有弹珠的情况下测试这个场景,你可以这样做

describe('Testing tests', () => {
  it('should return and start executing', (done) => {
    const updates$ = longRunningOperation();
    const expected = [1, 2, 3]
    const actual = []

    updates$.subscribe({
      next: d => {
        actual.push(d)
      },
      complete: () => {
        // expected equal to actual
        expect(actual.length).toEqual(expected.length)
        actual.forEach((v, i) => expect(v).toEqual(expected[i]))
        done()
      }
    })

  });
});

this stackblitz.

中可以看出

更新

也许有一种方法可以用弹珠测试你的场景,但这需要改变你的函数结构。

根据我的经验,Marble 测试定义某种源流,对此类流应用某种转换,然后将转换结果与预期的流值进行比较。

在这种情况下,源流可以是一个只通知一次的简单 Observable。此通知会触发 long 运行ning 函数 的执行,它本身会导致 update$ 通知。

所以,我们可以像这样稍微改变 longRunningOperation 函数

function longRunningOperation_(start$: Observable<any>): Observable<number> {
  const updates$ = new Subject<number>();
  // as soon as start$ notifies, the long running function is executed
  start$.pipe(
    tap(() => {
      console.log('starting updates...');
      updates$.next(1);
      updates$.next(2);
      updates$.next(3);
      updates$.complete();
    })
  )
    .subscribe();

  return updates$;
}

完成后,您就为源流创建了 space(在本例中由作为参数传递给 longRunningOperation_ 函数的 Observable 表示),因此可以进行测试运行 像这样

it('should return and start executing', () => {
    new TestScheduler((actual, expected) => {
      console.log('Comparing...');
      console.log('actual:', actual);
      console.log('expected:', expected);
      return expect(actual).toEqual(expected);
    }).run(({ hot, expectObservable }) => {
      const start = hot('0');
      const marbles = '(abc|)';
      const events = { a: 1, b: 2, c: 3 };

      const updates$ = longRunningOperation_(start);
      expectObservable(updates$).toBe(marbles, events);
      console.log('Sheduler subscribed');
    });
  });

完整示例请查看 this stackblitz