在 .switchMap 运算符中返回 .empty Observable

Returning .empty Observable inside .switchMap operator

我想我在 RxJS 库中发现了一个错误,或者只是无法理解它的行为。看看下面的代码:

const { Observable, Subject } = Rx;

const source$ = new Subject();

const handle = v => {
  if (v === 0) {
    source$.next(1);
    return Observable.empty();
  }  
  return Observable.interval(1000).startWith(-1).take(3);
}

source$
  .do(e => console.log('source:', e))
  // .delay(1)
  // .switchMap(v => Observable.of(null).switchMap(() => handle(v)))
  .switchMap(v => handle(v))
  .subscribe(e => console.log('handle:', e))

source$.next(0); // <- initial value
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

工作场景:当我运行它与source$.next(1)时,我看到以下结果(正确):

source: 1
handle: -1
handle: 0
handle: 1

错误场景: 当我 运行 它与 source$.next(0) 时,我希望看到:

source: 0
... (exactly the same output as for the previous example)

但我只看到 3 个值:

source: 0
source: 1
handle: -1

解决方法 1: 当我输入 .delay(1) 时,正如您在评论中看到的那样 - 它工作正常,正如我预期的那样。我认为这是因为默认调度程序在同一时间同步执行,而不是在使用延迟时使用异步调度程序。

问题 当我使用 Observable.of(null)(请参阅代码中的注释)时,它也会给我相同的("correct" 或预期的)结果。这是为什么?不应该和以前一样同步吗?

行为正确。是 source 的同步发射让它有点奇怪。

handle 方法中发出 1 会影响一个重入调用,该调用会看到 -1 发出的 startWithbefore 返回 Observable.empty - 因为 startWith 值是同步发出的。

这意味着 switchMap 首先看到 startWith/interval 可观察到,然后看到 empty 可观察到。然后它切换到 empty observable,影响你看到的输出。

添加 delay 防止重新进入调用。