使用 rxjs 运算符在订阅延迟后输出

Outputting after a delay on subscribe using rxjs operators

我需要得到如下输出:

computing 1
5
6
7
8

wait 2 seconds

computing 2
5
6
7
8

wait 2 seconds
...

但使用以下代码

from([1,2,3,4]).pipe(
    concatMap(n => of(n).pipe(
        tap(n => {console.log(`computing ${n}`)}),
        concatMap(n => from([5,6,7,8])),
        delay(2000)
    ))
).subscribe((val) => {console.log(val)}, () => {}, () => {console.log(`end`)})

输出将是

computing 1

wait 2 seconds

5
6
7
8
computing 2

wait 2 seconds

5
6
7
8
computing 3

因为 delay 将在最内层扁平化后生效,并导致下一个 computing x 字符串在值发出后立即打印。相反,我需要在没有初始延迟的情况下获得上面的示例输出,这可能吗?

首先我们设置一个函数来创建一个保持打开一段时间然后完成的可观察对象。

const nothingFor = (ms) => timer(ms).pipe(concatMapTo(EMPTY));

然后我们用它来创建一个新的运算符,其行为类似于 delay*,但在之后应用延迟。

const appendDelay = (delay) => (source$) =>
  of(source$, nothingFor(delay)).pipe(concatAll());

然后我们就把它放到你原来使用的地方 delay

from([1, 2, 3, 4])
  .pipe(
    concatMap((n) =>
      of(n).pipe(
        tap((n) => {
          console.log(`computing ${n}`);
        }),
        concatMap((n) => from([5, 6, 7, 8]).pipe(appendDelay(2000)))
      )
    )
  )

* 好吧,有点。 delay 将每次发射延迟相同的量。如果这更像 delay,它会在每次发射后而不是在源完成后添加延迟。

这个问题的症结在于,我们只想在第一个源可观察到的第一次发射之后引入延迟。

我们可以将 concatMapdelayWhen 运算符结合使用,让我们选择有条件的延迟。条件延迟可以建立在第一个可观察量的迭代索引之上。如果索引为0,则没有延迟,否则有2秒延迟。

在第一个 observable 和 post 条件延迟中的每个项目发射后,我们切换到第二个 observable 流 switchMap 这将帮助我们获得所需的输出。

import { from, iif, interval, of } from "rxjs";
import { concatMap, delayWhen, map, switchMap, tap } from "rxjs/operators";

const a$ = from([1, 2, 3, 4]);
const b$ = from([5, 6, 7, 8]);

const conditionalDelay$ = index => y =>
  iif(() => index === 0, interval(0), interval(2000));

const logDelay = index => () => {
  if (index !== 0) console.log("Wait for 2 seconds");
};

const result$ = a$.pipe(
  concatMap((x, index) =>
    of(x).pipe(
      tap(logDelay(index)),
      delayWhen(conditionalDelay$(index))
    )
  ),
  switchMap(x => {
    console.log("Computing", x);
    return b$.pipe(map(y => console.log(y)));
  })
);

result$.subscribe(x => console.log(x));

对您的代码进行一个小的调整就可以让它按照您想要的方式工作。不是简单地 return 在你的 concatMap 中设置你想要的 observable (of),你可以 return 一个 observable,第一个发出你想要的 observable,然后是一个发出你想要的 observable什么都没有,然后在延迟后完成。

要实现这一点,我们可以使用 concat and NEVER:

from([1,2,3,4]).pipe(
    concatMap(n => concat(
      of(n).pipe(
        tap(n => {console.log(`computing ${n}`)}),
        concatMap(n => from([5,6,7,8]))
      ),
      NEVER.pipe(takeUntil(timer(2000))) // emit nothing, then complete after 2000 ms
    ))
).subscribe((val) => {console.log(val)}, () => {}, () => {console.log(`end`)})

这是一个有效的 StackBlitz 演示。