Rxjs,重试3次,等待30秒,然后重复

Rxjs, retry 3 times, wait 30 seconds, then repeat

我正在使用 rxjs 连接到 WebSocket 服务,如果失败,我想重试 3 次,等待 30 秒,然后无限重复,我该怎么办?

您可以通过执行以下操作来完成此操作:

    //emit value every 200ms
    const source = Rx.Observable.interval(200);

    //output the observable
    const example = source
        .map(val => {
            if (val > 5) {
                throw new Error('The request failed somehow.');
            }
            return val;
        })
        .retryWhen(errors => errors
            //log error message
            .do(val => console.log(`Some error that occur ${val}, pauze for 30 seconds.`))

            //restart in 30 seconds
            .delayWhen(val => Rx.Observable.timer(30 * 1000))
        );

    const subscribe = example
        .subscribe({
            next: val => console.log(val),
            error: val => console.log(`This will never happen`)
        });


查看工作示例:https://jsbin.com/goxowiburi/edit?js,console

请注意,这是一个无限循环,您不会在代码中引入意外后果。

我找到了解决方案,首先,创建以下运算符:

function retryWithDelay<T>(
  repetitions: number,
  delay: number
): (a: Observable<T>) => Observable<T> {
  let count = repetitions;
  return (source$: Observable<T>) =>
    source$.pipe(
      retryWhen((errors) =>
        errors.pipe(
          delayWhen(() => {
            count--;
            if (count === 0) {
              count = repetitions;
              return timer(delay);
            }
            return timer(0);
          })
        )
      )
    );
}

然后,像这样使用它:

function connect(url: string) {
   return webSocket({ url })
      .pipe(retryWithDelay(3, 30000));
}