RXJS flatMap 到重复可观察对象

RXJS flatMap to repetitive observable

我正在尝试实现服务,如果应用程序连接到我的服务器,它会提供可观察的服务,因此当浏览器在线时,我们会使用计时器 ping 服务器。这是代码:

public get $connected(): Observable<boolean> {
    return this.hasInternetConnection
               .asObservable()
               .pipe(
                 distinctUntilChanged(),
                 flatMap((connected: boolean) => {
                   if (!connected) {
                     return of(connected);
                   } else {
                     return timer(5000)
                       .pipe(
                         map(() => {
                           var success = Math.random() > 0.5;
                           console.log('PING: ' + success);
                           return success;
                         })
                       );
                   }
                 })
               );
  }

hasInternetConnection 只是绑定到 window onlineoffline 事件的 BehaviorSubject,定时器模拟 ping 到我的 API 服务器。

问题是我的订阅 $connected 仅从计时器可观察到的第一个值捕获然后不起作用。在 hasInternetConnection 主题更改为 false 并返回到 true 后,我的订阅再次获得第一个值,然后什么都没有。这是我在控制台中看到的内容:

PING: true
subscription tap
PING: true
PING: false
PING: true
...

我该如何解决?谢谢!

完整解决方案:

  private hasInternetConnection: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(navigator.onLine);
  private connectedSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(true);
  private recheckConnectionSubject: Subject<void> = new Subject<void>();

  constructor(
    private readonly http: HttpClient,
  ) {
    fromEvent(window, 'online')
      .pipe(takeUntil(this.destroyed))
      .subscribe(() => {
        this.hasInternetConnection.next(true);
      });
    fromEvent(window, 'offline')
      .pipe(takeUntil(this.destroyed))
      .subscribe(() => {
        this.hasInternetConnection.next(false);
      });
    merge(
      this.hasInternetConnection,
      this.recheckConnectionSubject,
    )
      .pipe(
        mapTo(this.hasInternetConnection.value),
        switchMap((connected: boolean) => {
          if (!connected) {
            return of(connected);
          } else {
            return timer(0, 30000)
              .pipe(
                mergeMapTo(this.http.get(`${environment.apiRoot}/ping`, { responseType: 'text' })
                               .pipe(
                                 map((res) => {
                                   return true;
                                 }),
                                 catchError(() => {
                                   return of(false);
                                 })
                               )
                ),
              );
          }
        })
      )
      .subscribe(this.connectedSubject);
  }

  public get $connected(): Observable<boolean> {
    return this.connectedSubject.asObservable()
               .pipe(
                 distinctUntilChanged(),
               );
  }

  public resetTimer(): void {
      this.recheckConnectionSubject.next();
  }