RxJS 只重试最后一项,阻塞流

RxJS retry only last item, blocking the stream

我正在尝试使用 RxJS 来处理项目流,我希望它重试任何失败,优先延迟(如果可能的话,指数退避),但我需要保证顺序我实际上想要阻止流,并且我不想重新处理任何已经处理过的项目

所以我试着用 retryWhen 玩,按照这个例子:

const { interval, timer } = Rx;
const { take, map, retryWhen, delayWhen, tap } = RxOperators;

const source = take(5)(interval(1000));

source.pipe(
  map(val => {
    if (val >= 3) {
      throw val;
    }
    return val;
  }),
  retryWhen(errors =>
    errors.pipe(
      delayWhen(val => timer(1000))
    )
  )
);

但是流在开始时重新启动,它不只是重试最后一个:

是否可以达到我想要的效果?我也尝试了文档中的其他运算符,但没有运气。它会不会有点违背 RxJS 哲学?

应将 retryWhen 移动到内部 Observable 以仅处理失败值并保持主要 Observable 正常工作。

尝试如下操作:

// import { timer, interval, of } from 'rxjs';
// import { concatMap, delayWhen, map, retryWhen, take } from 'rxjs/operators';

const source = interval(1000).pipe(take(5));

source.pipe(
  concatMap((value) =>
    of(value).pipe(
      map((val) => {
        if (val >= 3) {
          throw val;
        }
        return val;
      }),
      retryWhen((errors) => errors.pipe(delayWhen(() => timer(1000))))
    )
  )
);