RXJS 扩展从不退出递归调用

RXJS expand never exit from recursive call

我在将文件上传到 firebase 后创建了对侦听 GCP 触发器日志的递归调用,这基本上运行良好。 我的问题是,我的递归扩展函数永远不会退出。 正如您从代码中看到的那样,我不止一次检查了这些值,似乎一切都很好,我也在控制台日志中看到了 ENTERED 消息,但是递归调用从未完成,没有任何错误。 如果条件为真,我想中断或强制中断递归调用,我该怎么办?

public getLog(filePath: string): Observable<object[]> {
    try {
      ...
        return this.getLogChunk()
          .pipe(
            expand((data: any) => {

              if (!environment.production && data && data.entries && data.entries.length > 0) {
                console.groupCollapsed('GCP Service Info [getLog]');
                console.info('[fileName]', fileName);
                console.info('[some - Finished]', data.entries.some((x: any) => x.textPayload.includes('Finished')));
                console.info('[some - Filename]', data.entries.some((x: any) => x.textPayload.includes(fileName)));
                console.info('[some - Finished - Filename]', data.entries.some((x: any) => x.textPayload.includes('Finished') && x.textPayload.includes(fileName)));
                console.info('[filter - Filename]', data.entries.filter((x: any) => x.textPayload.includes(fileName)));
                console.groupEnd();
              }

              if (data &&
                data.entries &&
                data.entries.some((x: any) => x.textPayload.includes('Finished') && x.textPayload.includes(fileName))) {
                console.log('ENTERED!!!!!');
                return of({});
              }
              return this.getLogChunk().pipe(delay(2000));

            }),
            map(res => res),
            reduce((acc: object[], val: any) => acc.concat(val), new Array<object>())
          )
      }
      return new Observable<object[]>();
    } catch (e) {
      if (!environment.production) {
        console.groupCollapsed('GCP Service Error [getLog]');
        console.error('[error]', e);
        console.groupEnd();
      }
      throw new Error(e);
    }
  }

在我调用 getlog 的地方,(你可以看到我检查了快照更改以仅触发一次 getlog,上传完成后,我对其进行了调试,没问题) 因此[complete] ENTERED 从未触发,因为 getlog 中的无限递归循环。

...
const uploadTask: AngularFireUploadTask = this.storage.upload(filePath, fileToUpload);
...
uploadTask.snapshotChanges()
              .pipe(
                filter(task => {
                  if (task) {
                    return task.state === firebase.storage.TaskState.SUCCESS
                  }
                  return false;
                })
              )
              .subscribe(val => {
                this.gcpService.getLog(filePath)
                  .subscribe({
                    next: data => console.log('[next] ENTERED'),
                    complete: () => {
                      console.log('[complete] ENTERED');
                    }
                  });
              })
...

我发现问题出在 return of({}) 调用中 - 这也扩展了,因为它提供了一个值。如果您希望扩展结束,请改为调用 return EMPTY(从 'Rxjs' 导入)。 EMTPY 没有 return 值并立即完成,因此 expand 不会继续监听。