单个和多个 .pipe 之间的区别

Difference between single and multiple .pipe

我正在为此目的编写一个可观察流:
有一个从 Web 请求发出的可观察对象和 2 个触发器,一个用于重新发出获得的值,另一个触发器用于重新加载值(重新执行请求)。此外,迟到的订阅者应该获得最后获得的值并能够接收 reset/reload.

最初我得到了以下代码(没有用):

public readonly initialValueFlat$ = this.seed$
  .pipe(
    switchMap(seed => defer(
      () => {
        this._isLoading$$.next(true);
        return this.getFakeData(seed)
          .pipe(
            finalize(() => this._isLoading$$.next(false)));
      })),
    repeatWhen(x => this._reloadTrigger$),
    switchMap(x => of(x)),
    repeatWhen(x => this._resetTrigger$),
    shareReplay(1));

经过努力,我找到了以下解决方案:

public readonly initialValue$ = this.seed$
  .pipe(
    switchMap(seed => defer(
      () => {
        this._isLoading$$.next(true);
        return this.getFakeData(seed)
          .pipe(
            finalize(() => this._isLoading$$.next(false)));
      })
      .pipe(
        repeatWhen(x => this._reloadTrigger$))
      .pipe(
        switchMap(x =>
          of(x)
            .pipe(
              repeatWhen(x => this._resetTrigger$))))),
    shareReplay(1));

但我不完全清楚为什么第一个不起作用,以及多个 .pipe 运算符如何影响链。 你能帮我说清楚吗? 谢谢

关于管道

Pipe 本身不做任何事情。它只是为您编写运算符。以下都是一样的:

of(1,2,3,4,5).pipe(
  take(3),
  map(v => v + 1),
  filter(v => v != 3)
);

of(1,2,3,4,5).pipe(
  take(3),
).pipe(
  map(v => v + 1),
  filter(v => v != 3)
);

of(1,2,3,4,5).pipe(
  take(3),
  map(v => v + 1)
).pipe(
  filter(v => v != 3)
);

of(1,2,3,4,5).pipe(
  take(3)
).pipe(
  map(v => v + 1)
).pipe(
  filter(v => v != 3)
);

of(1,2,3,4,5).pipe().pipe().pipe().pipe().pipe(
  take(3),
  map(v => v + 1),
  filter(v => v != 3)
).pipe().pipe().pipe().pipe().pipe().pipe().pipe();

您的代码

我已经在您的代码中添加了每一步的评论。

希望这可以让您了解为什么这两个可观察值非常不同。

我无能为力,因为我不清楚你想要完成什么。

评论 initialValueFlat$

const initialValueFlat$ = this.seed$.pipe(

  // SwitchMap subscribes to seed$ as source, then creates
  // a new getFakeData stream for each emission, cancelling old streams
  // (The defer you had here does nothing so I've removed)
  switchMap(seed => {
    this._isLoading$$.next(true);
    return this.getFakeData(seed).pipe(
      finalize(() => this._isLoading$$.next(false))
    );
  }),

  // Once the switchMap completes, resubscribes to the switchmap when
  // reloadTrigger$ emits. SwitchMap will resubscribe to seed$
  repeatWhen(_ => this._reloadTrigger$),

  // Creates a new observable that's identical to the source observable.
  // Soooo, this wastes CPU cycles and does nothing.
  switchMap(x => of(x)),

  // Same as the last one since the swtichmap between the two is 
  // effectively a no-op
  repeatWhen(_ => this._resetTrigger$),

  // Make this observable hot and let subscribers share values
  shareReplay(1)
);

评论初始值$

const initialValue$ = this.seed$.pipe(

  // SwitchMap subscribes to seed$ as source, then creates
  // a new getFakeData stream for each emission, cancelling old streams.
  switchMap(seed => 
    // It's defer that gets repeated, so unlike the previous code, if
    // you want isLoading$$.next to fire on each repeat, you do need defer
    defer(() => { 
      this._isLoading$$.next(true);
      return this.getFakeData(seed).pipe(
        finalize(() => this._isLoading$$.next(false)),
      );
    }).pipe(
      // Once defer completes, this will resubscribe to defer when 
      // reloadTrigger$ emits. This doesn't resubscribe to seed$, it
      // will end up calling getFakeData(seed) again.
      repeatWhen(_ => this._reloadTrigger$),

      switchMap(x => of(x).pipe(
        // Once the source completes (which is always going to happen 
        // immediately after x is emitted), this will emit x every time
        // reloadTrigger$ emits.
        repeatWhen(_ => this._reloadTrigger$)
      ))
    )
  ),

  // Make this observable hot and let subscribers share values
  shareReplay(1)
);