你会将 takeUntil RXJS 放在这段轮询服务器的代码中的什么地方?

where would you put takeUntil RXJS in this code that polls a server?

所以下面的代码是根据我在 堆栈溢出问题中得到的答案开发的。

该代码旨在轮询我的服务器,直到服务器上的条件为真,或者轮询已发生一分钟。

我知道我可以在一分钟后使用 takeUntil RXJS 函数停止轮询。但是,我不知道我应该把它放在代码的什么地方。当我把它放在我认为它会去的每个地方时,代码都出错了。

我也在使用 learnrxjs 网站上的这个教程 https://www.learnrxjs.io/learn-rxjs/operators/filtering/takeuntil

您会注意到 startastream() 函数的第一行是

const endtimer = timer(60000);

这是填充 takeUntil() 参数的条件。所以takeUntil(endtimer)

    start$ = this.http.get(environment.shochat_content_creator_set_valid_stream_start).pipe(
    tap(() => console.log('Stream start'))
  );
  poll$ = this.http.get(environment.check_if_stream_is_active_on_mux).pipe(
    tap(() => (this.streamready = true)),
    catchError(error => {
      console.log(error);
      return EMPTY;
    })
  );

  startastream(){
    const endtimer = timer(60000);
    this.streampollsubscription = this.start$.pipe(
      switchMap(() => timer(0, 5000).pipe(
        tap(() => console.log('Polling every 5s')),
        mergeMap(() => this.poll$)
      ))
    ).subscribe();

  }

您可以简单地将它放在 switchMap 之后的管道中:

    this.streampollsubscription = this.start$.pipe(
      switchMap(() => timer(0, 5000).pipe(
        tap(() => console.log('Polling every 5s')),
        mergeMap(() => this.poll$)
      )),
      takeUntil(endtimer)
    ).subscribe();

看看这个 StackBlitz