如果其中一个可观察的停止发出事件,为什么 Observable.race 不工作?

Why Observable.race not working if one of observable stop emit events?

如果互联网连接丢失,我想在 webapp 中实现 websocket 重新连接。为了检测互联网丢失,我使用乒乓方法,这意味着我从客户端发送 ping 消息和服务器 returns me pong-message。

加载 webapp 后,我发送 init ping 消息并开始在套接字上收听回复,例如:

this.websocket.onmessage = (evt) => {
      try {

        const websocketPayload: any = JSON.parse(evt.data);

        if (websocketPayload.pong !== undefined && websocketPayload.pong == 1) {

          this.pingPong$.next('pong');
        }

这意味着互联网连接正常,我们可以继续。我还有以下代码:

 Observable.race(
      Observable.of('timeout').delay(5000).repeat(),
      this.pingPong$
    ).subscribe((data) => {
      console.log("[ping-pong]:", data);
      if (data == 'pong') {
        Observable.interval(5000).take(1).subscribe(() => {
          console.log("[ping-pong]:sending ping")
          this.send({ping:1})
        });
      } else if (data == 'timeout'){
        // show reconnect screen and start reconnect
        console.error("It looks like websocket connection lost");
      }
    });

但是! 当 this.pingPong$ 主题停止发出事件时 - .next() 不会发生,因为当我手动断开连接时我们无法获得响应 - 我认为在 Observable.race 中将发出此 observable

Observable.of('timeout').delay(5000).repeat()

但如果 this.pingPong$ 停止发射,我的订阅就不会发生。

为什么?

谢谢

race 选择并订阅第一个发出的 Observable。

因此,如果您的 this.pingPong$ 开始发射然后停止,这没有任何区别,因为 race 一直订阅 this.pingPong$。其他 Observables 不再重要。您可能希望从 this.pingPong$ 发出一个值并重复整个过程。例如像下面这样:

Observable.race(
    Observable.of('timeout').delay(5000).repeat(),
    this.pingPong$
  )
  .pipe(
    take(1), // complete the chain immediately
    repeat() // resubscribe after take(1) completes the chain
  )
  .subscribe(...);

显然这主要取决于你想做什么,但我希望你能明白这一点。