如何在推送到主题之前比较值 - 单一数据源

How to compare values before pushing to subject - single data source

考虑一个包含对象数组的 BehaviorSubject。 该主题应该同时在多个地方订阅,即同一主题的多个订阅。 它通过轮询从后端接收数据。由于轮询相同的 api 会产生相同的数据,因此我使用 BehaviorSubject.value 将来自后端的新响应与 Subject 的现有数据进行比较,如下面的演示所示

https://stackblitz.com/edit/9ih9mv?file=index.ts

我的队友看到了 Ben Leah 对此话题的评论

根据@BenLesh 在此线程上的回答,我的团队强烈反对使用 .value 或 .getValue() 进行数据比较。 如果我有一个服务,其主题在加载 class 时被初始化,而在 class 被销毁(ngOnDestroy)时被取消订阅,那么在调用 .next() 之前使用 .value 比较数据是否有任何问题?

仅供参考,我尽量不使用 distinctUntilChanged,因为如果我有 1 个主题订阅多个订阅,它将触发多次比较(即每个订阅 1 次)。

您可以使用 pairwise() 运算符并比较两个值

文档:https://rxjs.dev/api/operators/pairwise

concatMap(() => this.getDataFromBackend()),
pairwise(),
switchMap(([oldResponse, newResponse]) => {
    // ...logic
    if (!Lodash.isEqual(oldResponse, newResponse)) {
        this.subject.next(newResponse);
    }
})

演示:https://stackblitz.com/edit/jmumnz-amtlq4?file=index.ts

Ben Lesh 回答的症结在于,使用 .value 表明您没有充分利用 RxJS。

If you're using getValue() you're doing something imperative in declarative paradigm.

在较小程度上,一般情况下主题也是如此。它们通常用于两个目的之一。多播,或命令式代码和声明式代码之间的桥梁。

这里您只需要多播组件。在大多数情况下,您可以使用操作员(他们在后台使用主题)为您完成此操作。

您在这里的许多歌舞都是以声明方式实现的distinctUntilChanged。通过这样做,您创建了一个既慢得多(在这里应该无关紧要)又更难维护(在这里应该重要)的版本。

以下是我如何重构您的代码(使用 shareReplaydistinctUntilChanged),使其更符合教条式的 RxJS。


interface Something {
  length: number
}

class SomeService implements OnInit, OnDestroy {

  /* Errors are "false", Data without a length is "undefined", and 
    everything else is "something". I wouldn't reccomend this,
    but as an example, sure.
  */
  private dataOb$: Observable<(Something | Boolean | undefined)[]>
  private pollingSubscription: Subscription;

  constructor(private readonly httpClient: HttpClient) {

  }

  ngOnInit() {
    this.dataOb$ = timer(0,1000).pipe(
      concatMap(() => this.getDataFromBackend()),
      distinctUntilChanged(Lodash.isEqual),
      shareReplay(1) // multicasting
    )

    // This service is effectively "using" itself. This means
    // the polling continues even if nobody else is listening.
    this.pollingSubscription = this.getDataObs().subscribe()
  }

  private getDataFromBackend(): Observable<(Something | Boolean | undefined)[]> {
    // This is a bizzaar function, but I assume it's just as an example
    return this.httpClient.get(url, options).pipe(
      map((response: Something[]) => {
        if (response?.length > 0) {
          return response;
        }
        return undefined;
      }),
      catchError(() => of(false))
    )
  }
  
  // I changed this from a private method
  getDataObs(): Observable<(Something | Boolean | undefined)[]> {
    return this.dataOb$
  }

  ngOnDestroy() {
    this.pollingSubscription.unsubscribe();
  }
}

顺便说一句:

Array<int>int[]
相同 Array<int|boolean> 等同于 (<int|boolean>)[]

更新

如果您想(例如)忽略错误和空排放:

private getDataFromBackend(): Observable<Something[]> {
  return this.httpClient.get<Something[]>(url, options).pipe(
    filter(response => response?.length > 0),
    catchError(() => EMPTY)
  )
}