如何链接 RxJS 可观察列表

How to chain a list of RxJS observable

对单页应用程序使用 Angular CLI 6

我需要做的是以下几点 发出 HTTP post 请求。我会得到一个好的回报。结果或副作用需要一段时间才能准备好进行处理。 我将不得不轮询结果准备情况,比如说每秒一次。

要轮询结果,我需要进行 HTTP get 调用并检查结果。如果结果完整,我就完成了。 不然我还要继续投票..

我所做的是,我有两个可观察对象,一个用于 HTTP post,一个用于 HTTP get 调用。 我使用 setTimeout 进行轮询。这个代码的组织,当我点击 setTimeout 时,我永远不会登陆我必须杀死应用程序...

关于这个问题有什么提示吗?

我目前所掌握的是

private initiateAnalysis(){
    this.initiateRequest$()
    .subscribe(response =>{
            const error = getErrorMessage(response);
            if (error !== null) {
                console.error(error);
            } else {
                this.processResults();
            }
        },
        (err: HttpErrorResponse) =>{
            console.error('feature error:', err);
        });
}

private initiateRequest$(): Observable<any>{
    let params: any = {
    };
    return this.problemsService.postRequest('postURL', {}, params)
}

private checkForResponse$(): Observable<any>{
    let params: any = {
    };

    return this.problemsService.getResults('someURL', params);
}

private processResults(){
    this.doneWithNecRiskAnalysis = false;
    while (!this.doneWithNecRiskAnalysis) {
        setTimeout(() =>{
            this.checkForResults();   // I never to this line in the code...
        }, 1000);
    }
}

private checkForResults() {
    this.checkForResponse$()
    .subscribe(response =>{
            const error = getErrorMessage(response);
            if (error !== null) {
                console.error(error);
            } else {
                if (1 === 1) { // just for now
                    this.showResults(response.payload);
                }
            }
        },
        (err: HttpErrorResponse) =>{
            console.error('feature error:', err);
        });
}

private showResults(results) {
    console.log('results', results);
}
while (!this.doneWithNecRiskAnalysis) {
    setTimeout(() =>{
        this.checkForResults();   // I never to this line in the code...
    }, 1000);
}

这会不断要求服务器在 1 秒内检查结果。你永远不会等到下一秒再问。

为了使您的代码更简单、更清晰,我要做的第一件事就是重构您的服务(and/or 您的后端),以便它们的可观察对象发出 error 如果出现错误,即使有错误消息也不会发出正常消息。

剩下的会假设你已经完成了。

您也应该停止使用 any 类型。

那么代码可以简化为:

  private initiateAnalysis() {
    this.initiateRequest$().pipe( // sends the first request to start the analysis
      switchMap(() => interval(1000)), // when the response comes back, start emitting an event every second from this observable
      mergeMap(() => this.checkForResponse$()), // each second, send the GET request and emit the results. Merge all the results into the resulting observable
      filter(results => this.isAnalysisComplete(results)), // filter out the results if they are not the final, correct results
      first() // only take the first complete results, to avoid continuing sending GET requests
    ).subscribe(results => this.showResults(results));
  }

  private initiateRequest$(): Observable<void> {
    const params = {
    };
    return this.problemsService.postRequest('postURL', {}, params)
  }

  private checkForResponse$(): Observable<Results>{
    const params = {
    };

    return this.problemsService.getResults('someURL', params);
  }

  private showResults(results: Results) {
    console.log('results', results);
  }

如果您想在发送下一个请求之前等待上一个响应,您可能更喜欢使用 concatMap() 而不是 mergeMap()

这是一个演示,其中实际的 HTTP 请求被随机延迟和随机响应所取代:https://stackblitz.com/edit/angular-bt17fb?file=src%2Fapp%2Fapp.component.ts