RXJS flatMap 到重复可观察对象
RXJS flatMap to repetitive observable
我正在尝试实现服务,如果应用程序连接到我的服务器,它会提供可观察的服务,因此当浏览器在线时,我们会使用计时器 ping 服务器。这是代码:
public get $connected(): Observable<boolean> {
return this.hasInternetConnection
.asObservable()
.pipe(
distinctUntilChanged(),
flatMap((connected: boolean) => {
if (!connected) {
return of(connected);
} else {
return timer(5000)
.pipe(
map(() => {
var success = Math.random() > 0.5;
console.log('PING: ' + success);
return success;
})
);
}
})
);
}
hasInternetConnection
只是绑定到 window online
和 offline
事件的 BehaviorSubject,定时器模拟 ping 到我的 API 服务器。
问题是我的订阅 $connected
仅从计时器可观察到的第一个值捕获然后不起作用。在 hasInternetConnection
主题更改为 false
并返回到 true
后,我的订阅再次获得第一个值,然后什么都没有。这是我在控制台中看到的内容:
PING: true
subscription tap
PING: true
PING: false
PING: true
...
我该如何解决?谢谢!
完整解决方案:
private hasInternetConnection: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(navigator.onLine);
private connectedSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(true);
private recheckConnectionSubject: Subject<void> = new Subject<void>();
constructor(
private readonly http: HttpClient,
) {
fromEvent(window, 'online')
.pipe(takeUntil(this.destroyed))
.subscribe(() => {
this.hasInternetConnection.next(true);
});
fromEvent(window, 'offline')
.pipe(takeUntil(this.destroyed))
.subscribe(() => {
this.hasInternetConnection.next(false);
});
merge(
this.hasInternetConnection,
this.recheckConnectionSubject,
)
.pipe(
mapTo(this.hasInternetConnection.value),
switchMap((connected: boolean) => {
if (!connected) {
return of(connected);
} else {
return timer(0, 30000)
.pipe(
mergeMapTo(this.http.get(`${environment.apiRoot}/ping`, { responseType: 'text' })
.pipe(
map((res) => {
return true;
}),
catchError(() => {
return of(false);
})
)
),
);
}
})
)
.subscribe(this.connectedSubject);
}
public get $connected(): Observable<boolean> {
return this.connectedSubject.asObservable()
.pipe(
distinctUntilChanged(),
);
}
public resetTimer(): void {
this.recheckConnectionSubject.next();
}
我正在尝试实现服务,如果应用程序连接到我的服务器,它会提供可观察的服务,因此当浏览器在线时,我们会使用计时器 ping 服务器。这是代码:
public get $connected(): Observable<boolean> {
return this.hasInternetConnection
.asObservable()
.pipe(
distinctUntilChanged(),
flatMap((connected: boolean) => {
if (!connected) {
return of(connected);
} else {
return timer(5000)
.pipe(
map(() => {
var success = Math.random() > 0.5;
console.log('PING: ' + success);
return success;
})
);
}
})
);
}
hasInternetConnection
只是绑定到 window online
和 offline
事件的 BehaviorSubject,定时器模拟 ping 到我的 API 服务器。
问题是我的订阅 $connected
仅从计时器可观察到的第一个值捕获然后不起作用。在 hasInternetConnection
主题更改为 false
并返回到 true
后,我的订阅再次获得第一个值,然后什么都没有。这是我在控制台中看到的内容:
PING: true
subscription tap
PING: true
PING: false
PING: true
...
我该如何解决?谢谢!
完整解决方案:
private hasInternetConnection: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(navigator.onLine);
private connectedSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(true);
private recheckConnectionSubject: Subject<void> = new Subject<void>();
constructor(
private readonly http: HttpClient,
) {
fromEvent(window, 'online')
.pipe(takeUntil(this.destroyed))
.subscribe(() => {
this.hasInternetConnection.next(true);
});
fromEvent(window, 'offline')
.pipe(takeUntil(this.destroyed))
.subscribe(() => {
this.hasInternetConnection.next(false);
});
merge(
this.hasInternetConnection,
this.recheckConnectionSubject,
)
.pipe(
mapTo(this.hasInternetConnection.value),
switchMap((connected: boolean) => {
if (!connected) {
return of(connected);
} else {
return timer(0, 30000)
.pipe(
mergeMapTo(this.http.get(`${environment.apiRoot}/ping`, { responseType: 'text' })
.pipe(
map((res) => {
return true;
}),
catchError(() => {
return of(false);
})
)
),
);
}
})
)
.subscribe(this.connectedSubject);
}
public get $connected(): Observable<boolean> {
return this.connectedSubject.asObservable()
.pipe(
distinctUntilChanged(),
);
}
public resetTimer(): void {
this.recheckConnectionSubject.next();
}