如何在 RXJS 管道中优雅地组合两个 Observable

how to gracefully combine two Observables inside RXJS pipe

我正在使用 Angular 构建日志查看器。

当用户进入页面时,我需要加载历史日志并开始监视新日志。 用户可以使用发出 query 对象的简单表单过滤日志。每次 query 更改过程重新启动(即“旧”结果被逐出,加载新历史记录并启动新“实时”流)。

我可以通过两种方式完成,但我都不满意。

第一个我觉得比较容易理解但是违反了DRY原则

const historicalLogs = this.querySubject
.pipe(
  debounceTime(250),
  tap(() => this.logs = []),
  switchMap(
    query => this.deviceLogsService.getLogsBefore(query, moment())
  )
);

const futureLogs = this.querySubject
.pipe(
  debounceTime(250),
  tap(() => this.logs = []),
  switchMap(
    query => timer(1000, 2000).pipe(mergeMap(t => this.deviceLogsService.getLogsAfter(query, moment())))
  )
);

merge(historicalLogs, futureLogs)
.subscribe(newLogs => {
  this.logs.push(...newLogs);
  this.scrollToVeryBottom();
});

第二个不违反DRY但是以后恐怕很难read/analyze:

this.querySubject
  .pipe(
    debounceTime(250),
    tap(() => this.logs = []),
    switchMap(query => concat([
      this.deviceLogsService.getLogsBefore(query, moment()),
      timer(1000, 2000).pipe(mergeMap(t => this.deviceLogsService.getLogsAfter(query, moment())))
    ]).pipe(mergeAll()))
  )
  .subscribe(newLogs => {
    this.logs.push(...newLogs);
    this.scrollToVeryBottom();
  });

对于以更具可读性和优雅的方式实现它的任何建议,我将不胜感激。

您尝试过使用 combineLatest 吗?:

obs$.pipe(
  ...,
  switchMap(query => combineLatest([
    this.deviceLogsService.getLogsBefore(query, moment()),
    timer(1000, 2000).pipe(mergeMap(t => this.deviceLogsService.getLogsAfter(query, moment())))
  ])),
  map(([logsBefore, logsAfter]) => {...})
)

一种简化方法是创建几个函数来获取您的未来和过去的日志:


private getHistoricalLogs(query) {
  return this.deviceLogsService.getLogsBefore(query, moment());
}

private pollForFutureLogs(query) {
  return timer(1000, 2000).pipe(
    switchMap(() => this.deviceLogsService.getLogsAfter(query, moment()))
  );
}

this.querySubject.pipe(
    debounceTime(250),
    tap(() => this.logs = []),
    switchMap(query => concat([
      this.getHistoricalLogs(query),
      this.pollForFutureLogs(query)
    ]))
  .subscribe(newLogs => {
    this.logs.push(...newLogs);
    this.scrollToVeryBottom();
  });

与其在“流外”维护一个单独的 logs 变量,不如简单地在 switchMap 内部发出一个空数组,然后使用 [=15= 将排放累积到一个数组中]:

logs = this.querySubject.pipe(
    debounceTime(250),
    switchMap(query => concat([
      of([]),
      this.getHistoricalLogs(query),
      this.pollForFutureLogs(query)
    ])),
    scan((all, logs) => all.concat(logs), [])
);

logs.subscribe(() => this.scrollToVeryBottom());