RXJS switchmap + tap 操作符
RXJS switchmap + tap like operator
我有一个文件流,我想填充有关它的附加信息,但我想向用户展示当前获得的数据,因为它是最初可见的所有内容。
我希望观察到:
- 取消新的发布(例如
switchMap
)
- 在发射之前不等待可观察对象完成(如
tap
)
我目前正在等待结果,然后再发出文件。
设置和当前尝试迭代:
this.pagedFLFiles = fileService.getFiles().pipe(
switchMap(response => concat(
of(response),
fileService.getAdditionalInfo(response.items).pipe(
switchMap(() => EMPTY),
),
)),
shareReplay(1),
);
fileService.getAdditionalInfo(response.items)
- 正在修改数据
getAdditionalInfo(files: FLFile[]): Observable<FLFile[]> {
return this.api.getWithToken(token => {
return { path: `v5/user/${token}/files/${files.map(file => file.id).join(',')}}/facilities` };
}).pipe(
map(information => {
files.forEach(file => {
const info = information[file.id];
(Object.entries(info) as [keyof typeof info, any][]).forEach(([key, value]) => {
file[key] = value;
});
});
return files;
}),
);
}
Concat 在发出值之前等待 observables of(reponse) 和 getAdditionalInfo。
Merge 每次它的一个 observable 发出时发出。
示例:
getFiles 将每秒发出 3 秒
getAdditionalInfo 将被取消 2 次(因为它运行时间超过 1 秒),因此只会修改最后发出的文件数组
import { merge, EMPTY, timer, of, interval } from 'rxjs';
import { finalize, switchMap, map, take, shareReplay } from 'rxjs/operators';
const fileService = {
getFiles: () => interval(1000).pipe(
take(3),
map(x => {
const items = [0, 1, 2].map(i => { return { 'info1': i }; })
return { 'index': x, 'items': items };
})
),
getAdditionalInfo: (files) => {
let wasModified = false;
return timer(2000).pipe(
map(information => {
files.forEach(file => {
file['info2'] = 'information' + files.length;
});
console.log('getAdditionalInfo: modified data');
wasModified = true;
return files;
}),
finalize(() => {
if (!wasModified) {
console.log('getAdditionalInfo: cancelled');
}
})
);
}
}
const pagedFLFiles = fileService.getFiles().pipe(
switchMap(response => {
return merge(
of(response),
fileService.getAdditionalInfo(response.items).pipe(
switchMap(() => EMPTY),
));
}
),
shareReplay(1),
);
pagedFLFiles.subscribe(x => {
console.log('immediate', x.index);
});
我有一个文件流,我想填充有关它的附加信息,但我想向用户展示当前获得的数据,因为它是最初可见的所有内容。
我希望观察到:
- 取消新的发布(例如
switchMap
) - 在发射之前不等待可观察对象完成(如
tap
)
我目前正在等待结果,然后再发出文件。
设置和当前尝试迭代:
this.pagedFLFiles = fileService.getFiles().pipe(
switchMap(response => concat(
of(response),
fileService.getAdditionalInfo(response.items).pipe(
switchMap(() => EMPTY),
),
)),
shareReplay(1),
);
fileService.getAdditionalInfo(response.items)
- 正在修改数据
getAdditionalInfo(files: FLFile[]): Observable<FLFile[]> {
return this.api.getWithToken(token => {
return { path: `v5/user/${token}/files/${files.map(file => file.id).join(',')}}/facilities` };
}).pipe(
map(information => {
files.forEach(file => {
const info = information[file.id];
(Object.entries(info) as [keyof typeof info, any][]).forEach(([key, value]) => {
file[key] = value;
});
});
return files;
}),
);
}
Concat 在发出值之前等待 observables of(reponse) 和 getAdditionalInfo。
Merge 每次它的一个 observable 发出时发出。
示例: getFiles 将每秒发出 3 秒 getAdditionalInfo 将被取消 2 次(因为它运行时间超过 1 秒),因此只会修改最后发出的文件数组
import { merge, EMPTY, timer, of, interval } from 'rxjs';
import { finalize, switchMap, map, take, shareReplay } from 'rxjs/operators';
const fileService = {
getFiles: () => interval(1000).pipe(
take(3),
map(x => {
const items = [0, 1, 2].map(i => { return { 'info1': i }; })
return { 'index': x, 'items': items };
})
),
getAdditionalInfo: (files) => {
let wasModified = false;
return timer(2000).pipe(
map(information => {
files.forEach(file => {
file['info2'] = 'information' + files.length;
});
console.log('getAdditionalInfo: modified data');
wasModified = true;
return files;
}),
finalize(() => {
if (!wasModified) {
console.log('getAdditionalInfo: cancelled');
}
})
);
}
}
const pagedFLFiles = fileService.getFiles().pipe(
switchMap(response => {
return merge(
of(response),
fileService.getAdditionalInfo(response.items).pipe(
switchMap(() => EMPTY),
));
}
),
shareReplay(1),
);
pagedFLFiles.subscribe(x => {
console.log('immediate', x.index);
});