如何在 RXJS 管道中优雅地组合两个 Observable
how to gracefully combine two Observables inside RXJS pipe
我正在使用 Angular 构建日志查看器。
当用户进入页面时,我需要加载历史日志并开始监视新日志。
用户可以使用发出 query
对象的简单表单过滤日志。每次 query
更改过程重新启动(即“旧”结果被逐出,加载新历史记录并启动新“实时”流)。
我可以通过两种方式完成,但我都不满意。
第一个我觉得比较容易理解但是违反了DRY原则
const historicalLogs = this.querySubject
.pipe(
debounceTime(250),
tap(() => this.logs = []),
switchMap(
query => this.deviceLogsService.getLogsBefore(query, moment())
)
);
const futureLogs = this.querySubject
.pipe(
debounceTime(250),
tap(() => this.logs = []),
switchMap(
query => timer(1000, 2000).pipe(mergeMap(t => this.deviceLogsService.getLogsAfter(query, moment())))
)
);
merge(historicalLogs, futureLogs)
.subscribe(newLogs => {
this.logs.push(...newLogs);
this.scrollToVeryBottom();
});
第二个不违反DRY但是以后恐怕很难read/analyze:
this.querySubject
.pipe(
debounceTime(250),
tap(() => this.logs = []),
switchMap(query => concat([
this.deviceLogsService.getLogsBefore(query, moment()),
timer(1000, 2000).pipe(mergeMap(t => this.deviceLogsService.getLogsAfter(query, moment())))
]).pipe(mergeAll()))
)
.subscribe(newLogs => {
this.logs.push(...newLogs);
this.scrollToVeryBottom();
});
对于以更具可读性和优雅的方式实现它的任何建议,我将不胜感激。
您尝试过使用 combineLatest
吗?:
obs$.pipe(
...,
switchMap(query => combineLatest([
this.deviceLogsService.getLogsBefore(query, moment()),
timer(1000, 2000).pipe(mergeMap(t => this.deviceLogsService.getLogsAfter(query, moment())))
])),
map(([logsBefore, logsAfter]) => {...})
)
一种简化方法是创建几个函数来获取您的未来和过去的日志:
private getHistoricalLogs(query) {
return this.deviceLogsService.getLogsBefore(query, moment());
}
private pollForFutureLogs(query) {
return timer(1000, 2000).pipe(
switchMap(() => this.deviceLogsService.getLogsAfter(query, moment()))
);
}
this.querySubject.pipe(
debounceTime(250),
tap(() => this.logs = []),
switchMap(query => concat([
this.getHistoricalLogs(query),
this.pollForFutureLogs(query)
]))
.subscribe(newLogs => {
this.logs.push(...newLogs);
this.scrollToVeryBottom();
});
与其在“流外”维护一个单独的 logs
变量,不如简单地在 switchMap
内部发出一个空数组,然后使用 [=15= 将排放累积到一个数组中]:
logs = this.querySubject.pipe(
debounceTime(250),
switchMap(query => concat([
of([]),
this.getHistoricalLogs(query),
this.pollForFutureLogs(query)
])),
scan((all, logs) => all.concat(logs), [])
);
logs.subscribe(() => this.scrollToVeryBottom());
我正在使用 Angular 构建日志查看器。
当用户进入页面时,我需要加载历史日志并开始监视新日志。
用户可以使用发出 query
对象的简单表单过滤日志。每次 query
更改过程重新启动(即“旧”结果被逐出,加载新历史记录并启动新“实时”流)。
我可以通过两种方式完成,但我都不满意。
第一个我觉得比较容易理解但是违反了DRY原则
const historicalLogs = this.querySubject
.pipe(
debounceTime(250),
tap(() => this.logs = []),
switchMap(
query => this.deviceLogsService.getLogsBefore(query, moment())
)
);
const futureLogs = this.querySubject
.pipe(
debounceTime(250),
tap(() => this.logs = []),
switchMap(
query => timer(1000, 2000).pipe(mergeMap(t => this.deviceLogsService.getLogsAfter(query, moment())))
)
);
merge(historicalLogs, futureLogs)
.subscribe(newLogs => {
this.logs.push(...newLogs);
this.scrollToVeryBottom();
});
第二个不违反DRY但是以后恐怕很难read/analyze:
this.querySubject
.pipe(
debounceTime(250),
tap(() => this.logs = []),
switchMap(query => concat([
this.deviceLogsService.getLogsBefore(query, moment()),
timer(1000, 2000).pipe(mergeMap(t => this.deviceLogsService.getLogsAfter(query, moment())))
]).pipe(mergeAll()))
)
.subscribe(newLogs => {
this.logs.push(...newLogs);
this.scrollToVeryBottom();
});
对于以更具可读性和优雅的方式实现它的任何建议,我将不胜感激。
您尝试过使用 combineLatest
吗?:
obs$.pipe(
...,
switchMap(query => combineLatest([
this.deviceLogsService.getLogsBefore(query, moment()),
timer(1000, 2000).pipe(mergeMap(t => this.deviceLogsService.getLogsAfter(query, moment())))
])),
map(([logsBefore, logsAfter]) => {...})
)
一种简化方法是创建几个函数来获取您的未来和过去的日志:
private getHistoricalLogs(query) {
return this.deviceLogsService.getLogsBefore(query, moment());
}
private pollForFutureLogs(query) {
return timer(1000, 2000).pipe(
switchMap(() => this.deviceLogsService.getLogsAfter(query, moment()))
);
}
this.querySubject.pipe(
debounceTime(250),
tap(() => this.logs = []),
switchMap(query => concat([
this.getHistoricalLogs(query),
this.pollForFutureLogs(query)
]))
.subscribe(newLogs => {
this.logs.push(...newLogs);
this.scrollToVeryBottom();
});
与其在“流外”维护一个单独的 logs
变量,不如简单地在 switchMap
内部发出一个空数组,然后使用 [=15= 将排放累积到一个数组中]:
logs = this.querySubject.pipe(
debounceTime(250),
switchMap(query => concat([
of([]),
this.getHistoricalLogs(query),
this.pollForFutureLogs(query)
])),
scan((all, logs) => all.concat(logs), [])
);
logs.subscribe(() => this.scrollToVeryBottom());