如何使用 RxJS 以时间间隔调用多个依赖 api 调用

How to call multiple dependent api calls with time intervals using RxJS

我正尝试在 angular 11 中为这样的场景编写代码 -

我有文件列表,对于我点击的每个文件 api(比如 api1),我从响应中获取一个文件 ID,然后将其传递给另一个 api (比如说 api2),我想每 3 秒继续点击 api2,除非我在响应中没有得到 status="available"。一旦我获得可用状态,我不再需要为该 fileId 点击 api2,我们可以开始循环处理下一个文件。

我拥有的每个文件的整个过程。

我知道我们可以使用像 mergeMap 或 switchMap 这样的 rxjs 运算符来实现这一点(因为现在顺序对我来说并不重要)。但我对 rxjs 还很陌生,不知道如何将它们组合在一起。

这就是我现在正在做的事情 -

this.filesToUpload.forEach((fileItem) => {
      if (!fileItem.uploaded) {
        if (fileItem.file.size < this.maxSize) {
          self.fileService.translateFile(fileItem.file).then( //hit api1
            (response) => {
              if (response && get(response, 'status') == 'processing') {
               //do some processing here 
               this.getDocumentStatus(response.fileId);
              } 
            },
            (error) => {
              //show error
            }
          );
        }
      }
   }); 
getDocumentStatus(fileId:string){
    this.docStatusSubscription = interval(3000)   //hitting api2 for every 3 seconds 
    .pipe(takeWhile(() => !this.statusProcessing))
    .subscribe(() => {
      this.statusProcessing = false;
      this.fileService.getDocumentStatus(fileId).then((response)=>{
        if(response.results.status=="available"){
          this.statusProcessing = true;
          //action complete for this fileId
        }
      },(error)=>{

      });
      
    })
    
  }

根据您所追求的描述,我可能会这样做。

  1. 创建一个包含您要进行的所有调用的可观察对象列表。
  2. 将列表连接在一起
  3. 订阅

使这项工作成功的原因是我们只订阅一次(不是每个文件一次),我们让操作员处理其他所有内容的订阅和取消订阅。

然后在我们订阅之前什么也不会发生。这样 concat 就可以为我们完成繁重的工作。没有必要自己用 this.statusProessing 之类的变量来跟踪任何东西。这一切都为我们处理了!这样不容易出错。

// Create callList. This is an array of observables that each hit the APIs and only
// complete when status == "available".
const callList = this.filesToUpload
  .filter(fileItem => !fileItem.uploaded && fileItem.file.size < this.maxSize)
  .map(fileItem => this.createCall(fileItem));

// concatenate the array of observables by running each one after the previous one
// completes.
concat(...callList).subscribe({
  complete: () => console.log("All files have completed"),
  error: err => console.log("Aborted call list due to error,", err)
});
createCall(fileItem: FileItemType): Observable<never>{
  // Use defer to turn a promise into an observable 
  return defer(() => this.fileService.translateFile(fileItem.file)).pipe(

    // If processing, then wait untill available, otherwise just complete
    switchMap(translateFileResponse => {
      if (translateFileResponse && get(translateFileResponse, 'status') == 'processing') {
        //do some processing here 
        return this.delayByDocumentStatus(translateFileResponse.fileId);
      } else {
        return EMPTY;
      }
    }),
    // Catch and then rethrow error. Right now this doesn't do anything, but If 
    // you handle this error here, you won't abort the entire call list below on 
    // an error. Depends on the behaviour you're after.
    catchError(error => {
      // show error
      return throwError(() => error);
    })

  );
}
delayByDocumentStatus(fileId:string): Observable<never>{
  // Hit getDocumentStatus every 3 seconds, unless it takes more
  // than 3 seconds for api to return response, then wait 6 or 9 (etc)
  // seconds.
  return interval(3000).pipe(
    exhaustMap(_ => this.fileService.getDocumentStatus(fileId)),
    takeWhile(res => res.results.status != "available"),
    ignoreElements(),
    tap({
      complete: () => console.log("action complete for this fileId: ", fileId)
    })
  );
}