RxJS 只重试最后一项,阻塞流
RxJS retry only last item, blocking the stream
我正在尝试使用 RxJS 来处理项目流,我希望它重试任何失败,优先延迟(如果可能的话,指数退避),但我需要保证顺序我实际上想要阻止流,并且我不想重新处理任何已经处理过的项目
所以我试着用 retryWhen
玩,按照这个例子:
const { interval, timer } = Rx;
const { take, map, retryWhen, delayWhen, tap } = RxOperators;
const source = take(5)(interval(1000));
source.pipe(
map(val => {
if (val >= 3) {
throw val;
}
return val;
}),
retryWhen(errors =>
errors.pipe(
delayWhen(val => timer(1000))
)
)
);
但是流在开始时重新启动,它不只是重试最后一个:
是否可以达到我想要的效果?我也尝试了文档中的其他运算符,但没有运气。它会不会有点违背 RxJS 哲学?
应将 retryWhen
移动到内部 Observable
以仅处理失败值并保持主要 Observable
正常工作。
尝试如下操作:
// import { timer, interval, of } from 'rxjs';
// import { concatMap, delayWhen, map, retryWhen, take } from 'rxjs/operators';
const source = interval(1000).pipe(take(5));
source.pipe(
concatMap((value) =>
of(value).pipe(
map((val) => {
if (val >= 3) {
throw val;
}
return val;
}),
retryWhen((errors) => errors.pipe(delayWhen(() => timer(1000))))
)
)
);
我正在尝试使用 RxJS 来处理项目流,我希望它重试任何失败,优先延迟(如果可能的话,指数退避),但我需要保证顺序我实际上想要阻止流,并且我不想重新处理任何已经处理过的项目
所以我试着用 retryWhen
玩,按照这个例子:
const { interval, timer } = Rx;
const { take, map, retryWhen, delayWhen, tap } = RxOperators;
const source = take(5)(interval(1000));
source.pipe(
map(val => {
if (val >= 3) {
throw val;
}
return val;
}),
retryWhen(errors =>
errors.pipe(
delayWhen(val => timer(1000))
)
)
);
但是流在开始时重新启动,它不只是重试最后一个:
是否可以达到我想要的效果?我也尝试了文档中的其他运算符,但没有运气。它会不会有点违背 RxJS 哲学?
应将 retryWhen
移动到内部 Observable
以仅处理失败值并保持主要 Observable
正常工作。
尝试如下操作:
// import { timer, interval, of } from 'rxjs';
// import { concatMap, delayWhen, map, retryWhen, take } from 'rxjs/operators';
const source = interval(1000).pipe(take(5));
source.pipe(
concatMap((value) =>
of(value).pipe(
map((val) => {
if (val >= 3) {
throw val;
}
return val;
}),
retryWhen((errors) => errors.pipe(delayWhen(() => timer(1000))))
)
)
);