RXJS switchmap + tap 操作符

RXJS switchmap + tap like operator

我有一个文件流,我想填充有关它的附加信息,但我想向用户展示当前获得的数据,因为它是最初可见的所有内容。

我希望观察到:

我目前正在等待结果,然后再发出文件。

设置和当前尝试迭代:

this.pagedFLFiles = fileService.getFiles().pipe(
    switchMap(response => concat(
        of(response),
        fileService.getAdditionalInfo(response.items).pipe(
            switchMap(() => EMPTY),
        ),
    )),
    shareReplay(1),
);

fileService.getAdditionalInfo(response.items) - 正在修改数据

getAdditionalInfo(files: FLFile[]): Observable<FLFile[]> {
    return this.api.getWithToken(token => {
        return { path: `v5/user/${token}/files/${files.map(file => file.id).join(',')}}/facilities` };
    }).pipe(
        map(information => {
            files.forEach(file => {
                const info = information[file.id];
                (Object.entries(info) as [keyof typeof info, any][]).forEach(([key, value]) => {
                    file[key] = value;
                });
            });
            return files;
        }),
    );
}

使用merge instead of concat.

Concat 在发出值之前等待 observables of(reponse) 和 getAdditionalInfo。

Merge 每次它的一个 observable 发出时发出。

示例: getFiles 将每秒发出 3 秒 getAdditionalInfo 将被取消 2 次(因为它运行时间超过 1 秒),因此只会修改最后发出的文件数组

import { merge, EMPTY, timer, of, interval } from 'rxjs';
import { finalize, switchMap, map, take, shareReplay } from 'rxjs/operators';


const fileService = {
  getFiles: () => interval(1000).pipe(
    take(3),
    map(x => {
      const items = [0, 1, 2].map(i => { return { 'info1': i }; })
      return { 'index': x, 'items': items };
    })
  ),
  getAdditionalInfo: (files) => {
    let wasModified = false;
    return timer(2000).pipe(
      map(information => {
        files.forEach(file => {
          file['info2'] = 'information' + files.length;
        });
        console.log('getAdditionalInfo: modified data');
        wasModified = true;
        return files;
      }),
      finalize(() => {
        if (!wasModified) {
          console.log('getAdditionalInfo: cancelled');
        }
      })
    );
  }
}


const pagedFLFiles = fileService.getFiles().pipe(
  switchMap(response => {
    return merge(
      of(response),
      fileService.getAdditionalInfo(response.items).pipe(
        switchMap(() => EMPTY),
      ));
  }
  ),
  shareReplay(1),
);


pagedFLFiles.subscribe(x => {
  console.log('immediate', x.index);
});

Stackblitz