HTTP 成功响应后停止 RxJS HTTP 轮询

Stop RxJS HTTP Polling after HTTP success response

我有以下模仿 HTTP 请求轮询的代码。

  timeout:Observable<number> = timer(10000);

  startPollingStackblitz(arnId: string) {
    const poll:Observable<BuyingData[]> = of({}).pipe(
        mergeMap(_ => {
          console.log('polling...' + arnId);
          return of([]);
          // return this.service.getData(arnId);
        }),
        takeUntil(this.timeout),
        tap(_ => console.info('---waiting 2 secs to restart polling')),
        delay(2000),
        repeat(),
        tap(_ => console.info('---restarted polling')),
      );

    this.subscription = poll.subscribe((data) => {
      console.log('subscribe...')
      if (data.length > 0) {
        console.log('timeout...');
        console.log(this.timeout);// I want to stop polling immediately before timer will elapse
      }
    });
  }

我希望我的轮询在服务器响应 data.length > 0 时停止发送 HTTP 请求(在此演示版本中记录 'polling...')。由于某种原因,即使在 10000 毫秒超时后它仍继续发送请求。我该怎么做?

Repeat returns 一个 Observable,它将在源流完成时重新订阅源流,在您的情况下,尽管源 Observable 已完成(感谢 takeUntil),使用 repeat会反复重新订阅源流

您可以尝试以下操作,而不是重复:

const poll :Observable<BuyingData[]> = interval(2000).pipe(
  exhaustMap(() => this.service.getData())
  takeUntil(this.timeout),
  takeWhile(data => data.length > 0),
);

嗯,据我了解,您有两个停止条件:

  1. 超时后(10 秒)
  2. 当回复满足您的条件时(data.length > 0)

您可以通过将 takeUntilracetimer 运算符与以下主题组合来实现此目的。

const stopper = new Subject(); // to stop emitting
const poll = of({}).pipe(
  mergeMap(_ =>
    fakeDelayedRequest().pipe(
      catchError(e => {
        console.error(e);
        return of(false);
      })
    )
  ),
  tap(write),
  tap(_ => console.info("---waiting 3 secs to restart polling")),
  delay(3000),
  tap(_ => console.info("---restarted polling")),
  repeat(),
  takeUntil(stopper.pipe(race(timer(10000)))) // this should be the last in the pipe
  // else `repeat` operator will be repeating without a condition.
);

poll.subscribe(_ => {
  const rnd = Math.random();
  if (rnd> 0.3) { // random stop condition
    console.log("closing !",rnd);
    stopper.next(); // emit the stop
  }
});

takeUntil 将在目标可观察对象发出值时停止。 timer 将在 10 秒后发出一个值。 race 将从 stoppertimer 发出一个值,哪个先出现。

Stackblitz