如果在超时内没有 next(...) 调用,则重试(重新订阅)源可观察
Retry (resubscribe) to source observable if no next(...) call within a timeout
我正在尝试获取一个 rxjs 源可观察对象,表示向我推送数据的网络连接,如果我在超时期限内没有收到数据,则重新连接(通过重新订阅源可观察对象)。我当然可以用一些 hacky 的方式来写这个,但是有没有用 rxjs 简洁地写这个的好方法?
我最终写了一个运算符。我认为有更好的方法来做到这一点,但鉴于其他人也没有想法,这是我写的可管道运算符:
import { Observable, Subscription } from "rxjs";
export function retryAfterTimeout<T>(timeout: number, allowCompletion = false): (obs: Observable<T>) => Observable<T> {
return source => new Observable<T>(observer => {
let sub: Subscription | undefined;
let timer: number | undefined;
function resetTimer() {
if (timer) clearTimeout(timer);
timer = window.setTimeout(() => resub(), timeout);
}
function resub() {
if (sub) sub.unsubscribe();
sub = source.subscribe({
next(x) {
resetTimer();
observer.next(x);
},
error(err) {
observer.error(err);
},
complete() {
if (allowCompletion)
observer.complete();
else
resub();
}
});
}
resub();
resetTimer();
return () => {
if (sub) sub.unsubscribe();
if (timer) window.clearTimeout(timer);
};
});
}
我正在尝试获取一个 rxjs 源可观察对象,表示向我推送数据的网络连接,如果我在超时期限内没有收到数据,则重新连接(通过重新订阅源可观察对象)。我当然可以用一些 hacky 的方式来写这个,但是有没有用 rxjs 简洁地写这个的好方法?
我最终写了一个运算符。我认为有更好的方法来做到这一点,但鉴于其他人也没有想法,这是我写的可管道运算符:
import { Observable, Subscription } from "rxjs";
export function retryAfterTimeout<T>(timeout: number, allowCompletion = false): (obs: Observable<T>) => Observable<T> {
return source => new Observable<T>(observer => {
let sub: Subscription | undefined;
let timer: number | undefined;
function resetTimer() {
if (timer) clearTimeout(timer);
timer = window.setTimeout(() => resub(), timeout);
}
function resub() {
if (sub) sub.unsubscribe();
sub = source.subscribe({
next(x) {
resetTimer();
observer.next(x);
},
error(err) {
observer.error(err);
},
complete() {
if (allowCompletion)
observer.complete();
else
resub();
}
});
}
resub();
resetTimer();
return () => {
if (sub) sub.unsubscribe();
if (timer) window.clearTimeout(timer);
};
});
}