尝试轮询服务器但是 rxjs 方法可能不正确,因为算法中断

attempting to poll a server however rxjs methods may be incorrect as algorithm breaks

所以我正在尝试轮询我的服务器。我试图每 5 秒轮询一次我的服务器。一分钟后投票超时。我在调试方法中有一些控制台日志,但即使在等待 5 秒后仍然触发的唯一 console.log 是 'validate stream start'

我按照 this 教程进行了操作,但没有创建单独的服务,因为我的应用程序中的这一页只需要它。

我敢打赌这是对这些 rxjs 运算符工作方式的简单错误或误解。

我做错了什么?

startastream(){
    this.startastreamboolan = true;
    let count = 12;
    this.http.get(environment.shochat_content_creator_set_valid_stream_start)
      .subscribe((req: any)=>{
        console.log('validate stream start');
        timer(5000).pipe(
          switchMap(() =>{
            console.log('timer start');
            if(count > 0){
              return this.http.get(environment.check_if_stream_is_active_on_mux)
                .subscribe(
                  (req: any)=>{
                    this.streamready = true;
                    return 0;
                  },
                  error => {
                    count = count -1;
                    console.log(count);

                  });
            }
          }));
      });
  } 

定时器

创建一个 Observable,它在 dueTime 后开始发射,并在之后的每个 period 时间后发射不断增加的数字。

timer(dueTime: number | Date = 0, periodOrScheduler?: number | SchedulerLike, scheduler?: SchedulerLike): Observable<number>

Its like interval, but you can specify when should the emissions start.

在您的代码中,您忘记了 timer 运算符的第二个参数,这意味着它只会等待 5s 然后只发出一次。此外,您没有订阅 timer,这就是它不再继续的原因。如果你想每隔 5s 轮询你的服务器,你需要使用 :

timer(0, 5000)

在这里,计时器不会等待,而是会每隔 5s 直接开始发射值。


接下来,我看到您正在使用 switchMap 创建内部流,这很好。但是,如果您的服务器处理请求的时间超过 5s,您可能需要使用 mergeMap,因为它不会取消之前正在进行的内部流,这取决于您。

这里的错误是switchMap(或mergeMap)接受了一个回调函数,必须 return一个Observable,然后运营商会自己订阅它。在这里,您 return 正在使用 Subscription

以下是您可以执行的操作:

const start$ = this.http.get(startUrl).pipe(
  tap(() => console.log('Stream start'))
);
const poll$ = this.http.get(pollUrl).pipe(
  tap(() => (this.streamReady = true)),
  catchError(error => {
    console.log(error);
    return EMPTY;
  })
);

start$.pipe(
  switchMap(() => timer(0, 5000).pipe(
    tap(() => console.log('Polling every 5s')),
    mergeMap(() => poll$)
  ))
).subscribe();

我在这里创建了两个 Observablesstart$ 负责启动流,poll$ 负责轮询您的服务器。这里的想法是启动流,然后切换到 timer 内部流,它将每隔 5s 发出一次,然后再次切换到另一个将轮询服务器的内部流。

我没有在我的示例中包含 count,因此使用此代码流将永远 运行。为此,您应该查看 takeUntil 运算符。

希望对您有所帮助,欢迎提问!