Rxjs,重试3次,等待30秒,然后重复
Rxjs, retry 3 times, wait 30 seconds, then repeat
我正在使用 rxjs 连接到 WebSocket 服务,如果失败,我想重试 3 次,等待 30 秒,然后无限重复,我该怎么办?
您可以通过执行以下操作来完成此操作:
//emit value every 200ms
const source = Rx.Observable.interval(200);
//output the observable
const example = source
.map(val => {
if (val > 5) {
throw new Error('The request failed somehow.');
}
return val;
})
.retryWhen(errors => errors
//log error message
.do(val => console.log(`Some error that occur ${val}, pauze for 30 seconds.`))
//restart in 30 seconds
.delayWhen(val => Rx.Observable.timer(30 * 1000))
);
const subscribe = example
.subscribe({
next: val => console.log(val),
error: val => console.log(`This will never happen`)
});
查看工作示例:https://jsbin.com/goxowiburi/edit?js,console
请注意,这是一个无限循环,您不会在代码中引入意外后果。
我找到了解决方案,首先,创建以下运算符:
function retryWithDelay<T>(
repetitions: number,
delay: number
): (a: Observable<T>) => Observable<T> {
let count = repetitions;
return (source$: Observable<T>) =>
source$.pipe(
retryWhen((errors) =>
errors.pipe(
delayWhen(() => {
count--;
if (count === 0) {
count = repetitions;
return timer(delay);
}
return timer(0);
})
)
)
);
}
然后,像这样使用它:
function connect(url: string) {
return webSocket({ url })
.pipe(retryWithDelay(3, 30000));
}
我正在使用 rxjs 连接到 WebSocket 服务,如果失败,我想重试 3 次,等待 30 秒,然后无限重复,我该怎么办?
您可以通过执行以下操作来完成此操作:
//emit value every 200ms
const source = Rx.Observable.interval(200);
//output the observable
const example = source
.map(val => {
if (val > 5) {
throw new Error('The request failed somehow.');
}
return val;
})
.retryWhen(errors => errors
//log error message
.do(val => console.log(`Some error that occur ${val}, pauze for 30 seconds.`))
//restart in 30 seconds
.delayWhen(val => Rx.Observable.timer(30 * 1000))
);
const subscribe = example
.subscribe({
next: val => console.log(val),
error: val => console.log(`This will never happen`)
});
查看工作示例:https://jsbin.com/goxowiburi/edit?js,console
请注意,这是一个无限循环,您不会在代码中引入意外后果。
我找到了解决方案,首先,创建以下运算符:
function retryWithDelay<T>(
repetitions: number,
delay: number
): (a: Observable<T>) => Observable<T> {
let count = repetitions;
return (source$: Observable<T>) =>
source$.pipe(
retryWhen((errors) =>
errors.pipe(
delayWhen(() => {
count--;
if (count === 0) {
count = repetitions;
return timer(delay);
}
return timer(0);
})
)
)
);
}
然后,像这样使用它:
function connect(url: string) {
return webSocket({ url })
.pipe(retryWithDelay(3, 30000));
}