为什么 concatMap 没有按预期工作?

Why concatMap is not working as expected?

请看下面我的代码

import { Observable, interval } from 'rxjs';
import { map, take, mergeMap, concatMap, switchMap, exhaustMap } from 'rxjs/operators';

const frameworks = ['Backbone', 'Angular', 'React', 'Vue'];

const getRecruits = agency => new Observable(observer => {
  console.log('agency', agency);
  interval(1000).pipe(
    take(5)
  ).subscribe(val => observer.next(`${agency} Developer ${val}`));
});

// concatMap
interval(3000).pipe(
  take(4),
  map(val => frameworks[val]),
  concatMap(agency => getRecruits(agency))
).subscribe(val => console.log(val));

这是我的输出:

我预计在 Backbone 完成后,它会继续使用 Angular、React 和 Vue。但是在 Backbone

之后执行停止了

有什么解释吗?

要完成它,您需要调用 observer.complete,目前代码仅调用 observer.next

要做到这一点,请像那样更改您的代码

).subscribe(
  val => observer.next(`${agency} Developer ${val}`),
  () => observer.complete(),
  () => observer.complete(),
);

关键知识

conactMap 仅在前一个已完成的情况下发出下一个 observable。

原因

如果您牢记这一点并查看 getRecruits,您会看到内部间隔可观察对象关闭,但实际上您新创建的 new Observable(observer => ...) 并没有,因为您仅通过 nextobserver 发送值。您可以在内部订阅触发完成的同时完成返回的可观察对象。

解决方案

const getRecruits = agency => new Observable(observer => {
  interval(1000).pipe(
    take(5)
  ).subscribe(
    val => observer.next(`${agency} Developer ${val}`),
    () => void 0, // You can use void 0 for no emit
    () => observer.complete() // Your interval source completes here and you call the new Observalbe to be completed
  );
});