Angular 11 + RxJS:concatMap 为每个数组项调用 ReplaySubject Observable 并最终确定不工作

Angular 11 + RxJS: concatMap for each array item to call ReplaySubject Observable and finalize not working

我不是 Rxjs 专家。我有以下代码。我正在尝试上传多个文件,对于每个文件,我想进行 base64 转换并将所有结果放入最终数组。下面的代码不起作用, finalize 根本没有被调用。谁能帮我看看我的代码有什么问题?

private validateSelectedFiles(files: FileList): void {
  this.spinnerService.show();
  from(files)
    .pipe(
      concatMap(f => this.convertFile(f)),
      finalize(() => {
        console.log('finalize'); //not coming here
        this.spinnerService.hide();
      }),
      scan((acc, curr) => {
        acc.push(curr)
        return acc;
      }, [])
    ).subscribe(result => {
      console.log('result', result); //Only once it is coming here
    })
}

private convertFile(file: File): Observable<string> {
  const result  = new ReplaySubject<string>(1);
  const reader = new FileReader();
  reader.readAsBinaryString(file);
  reader.onload = (event) => {
    result.next(btoa(event.target.result.toString()))
  };
  return result;
}

预期功能:

  1. 对于每个上传的文件,将其转换为base64
  2. 获取base64转换后的结果数组做进一步处理

以下代码对我有用:

private convertAttachmentsToBase64(): void {
        forkJoin(Array.from(this.addedFiles).map(file => this.convertFile(file)))
        .pipe(
            takeUntil(this.destroy$),
            finalize(() => this.spinnerService.hide()),
        )
        .subscribe((result) => this.attachmentsToUpload = result);      
    }   



// Convert file to Base64   
    private convertFile(file: File): Observable<AttachmentToUpload> {
        const result  = new Subject<AttachmentToUpload>();
        const reader = new FileReader();
        reader.readAsBinaryString(file);
        reader.onload = (event) => {
            result.next({
                fileName: file.name,
                data: btoa(event.target.result.toString())
            })
            result.complete();
        };
        return result;
    }   

我可以提出以下建议

  1. 使用 RxJS forkJoin + Array#from + Array#map 代替 from + scan 组合。
  2. 使用 new Observable() 创建 observable 而不是 ReplaySubject
  3. 目前我在问题中没有看到 HTTP 请求。我使用 switchMap 运算符添加了它。
  4. 我不确定 convertFile() 是否真的将文件转换为 Base64。作为替代方案,您可以尝试 解决方案。
  5. 重要提示:forkJoin 仅当所有源可观察对象都发出时才会发出。因此,如果 onload 函数未被 FileReader() 触发,则 forkJoin 将不会发出并且 finalizesubscribe 将不会被触发。
private validateSelectedFiles(files: FileList): void {
  forkJoin(Array.from(files).map(file => this.convertFile(file))).pipe(
    switchMap((filesB64) => this.http.post('some url')),
    finalize(() => this.spinnerService.hide())
  ).subscribe({
    next: (result: any) => {
      // handle response
    },
    error: (error: any) => {
      // handle errors
    }
  });
}

private convertFile(file: File): Observable<string> {
  return new Observable((observer: any) => {
    const reader = new FileReader();
    reader.onload = (event: any) => {
      observer.next(btoa(event.target.result.toString()));
      observer.complete();
    };
    reader.readAsBinaryString(file);
  });
}

更新:为 FileList 添加 Array#from()(来源:

finalize() 是一个 tear-down 处理程序,在链终止时调用(在源 Observable 完成、错误或链被取消订阅后)。

在你的例子中,你正在使用 concatMap() 将另一个 Observable 合并到链中。但是,您正在合并永远不会自行完成的 ReplaySubject,因此链永远不会终止,因此永远不会调用 finalize()

看起来您甚至不需要使用 ReplaySubject,因此您可以在加载文件后立即完成 Subject

private convertFile(file: File): Observable<string> {
  const result  = new Subject<string>();
  const reader = new FileReader();
  reader.readAsBinaryString(file);
  reader.onload = (event) => {
    result.next(btoa(event.target.result.toString()));
    result.complete();
  };
  return result;
}