mergeMap 运算符如何工作?

How the mergeMap operator works?

我试图理解 mergeMap 在这个例子中是如何工作的,但仍然不明白。你能帮帮我吗?

import { interval } from 'rxjs';
import { mergeMap, take } from 'rxjs/operators';


// emit value every 1s
const source$ = interval(1000);

source$
  .pipe(
    mergeMap(
      // project
      val => interval(5000).pipe(take(2)),
      // resultSelector
      (oVal, iVal, oIndex, iIndex) => [oIndex, oVal, iIndex, iVal],
      // concurrent
      2
    )
  )
  /*
        Output:
        [0, 0, 0, 0] <--1st inner observable
        [1, 1, 0, 0] <--2nd inner observable
        [0, 0, 1, 1] <--1st inner observable
        [1, 1, 1, 1] <--2nd inner observable
        [2, 2, 0, 0] <--3rd inner observable
        [3, 3, 0, 0] <--4th inner observable
*/
  .subscribe(val => console.log(val));

https://stackblitz.com/edit/typescript-r3gcr4?file=index.ts&devtoolsheight=100

我不明白控制台中显示的值

先谢谢你了。

让我们将逻辑分解为简单的线程:

您的 source$ 计时器每秒发出值 0、1、2、3...。这个线程通过你的管道,其中 mergeMap 用于修改你的线程。

并发值2表示每2个输入值传完等待执行完成,再传下2个值执行,以此类推

您有选择器:[旧索引,旧值,当前索引,当前值],让我们看看:

  • 第一个值为 0,来自 source$,将生成 [0, 0, 0 ,0],因为传入值为 0,当前值为 mergeMap来自 interval(5000)0
  • 第二个值 1source$ 后 1000 毫秒后传递给 mergeMap,然后变异为值 [1, 1, 0, 0],其中 0, 0 - 作为第一个值和来自 interval(5000) 的索引。对于每个管道值,将生成新的 interval(5000)
  • 5000 毫秒后,它从 mergeMap 1, 1 生成第二个值和索引,因此在值为 0, 0 的第一个管道线程中它生成输出:[0, 0, 1, 1]。在值为 1, 1 的第二个管道中,它生成输出 [1, 1, 1, 1].
  • 现在接下来的两个值从 source$ 转到 mergeMap2, 23, 3。因此,我们在 5000 毫秒后再次使用新的 interval(5000)0, 01, 1 重复这些步骤。所以它生成 [2, 2, 0, 0][3, 3, 0, 0],并在 5000 毫秒后生成 [2, 2, 1, 1][3, 3, 1, 1]

等等。