如何在推送到主题之前比较值 - 单一数据源
How to compare values before pushing to subject - single data source
考虑一个包含对象数组的 BehaviorSubject。
该主题应该同时在多个地方订阅,即同一主题的多个订阅。
它通过轮询从后端接收数据。由于轮询相同的 api 会产生相同的数据,因此我使用 BehaviorSubject.value 将来自后端的新响应与 Subject 的现有数据进行比较,如下面的演示所示
https://stackblitz.com/edit/9ih9mv?file=index.ts
我的队友看到了 Ben Leah 对此话题的评论
根据@BenLesh 在此线程上的回答,我的团队强烈反对使用 .value 或 .getValue() 进行数据比较。
如果我有一个服务,其主题在加载 class 时被初始化,而在 class 被销毁(ngOnDestroy)时被取消订阅,那么在调用 .next() 之前使用 .value 比较数据是否有任何问题?
仅供参考,我尽量不使用 distinctUntilChanged,因为如果我有 1 个主题订阅多个订阅,它将触发多次比较(即每个订阅 1 次)。
您可以使用 pairwise() 运算符并比较两个值
文档:https://rxjs.dev/api/operators/pairwise
concatMap(() => this.getDataFromBackend()),
pairwise(),
switchMap(([oldResponse, newResponse]) => {
// ...logic
if (!Lodash.isEqual(oldResponse, newResponse)) {
this.subject.next(newResponse);
}
})
Ben Lesh 回答的症结在于,使用 .value
表明您没有充分利用 RxJS。
If you're using getValue() you're doing something imperative in declarative paradigm.
在较小程度上,一般情况下主题也是如此。它们通常用于两个目的之一。多播,或命令式代码和声明式代码之间的桥梁。
这里您只需要多播组件。在大多数情况下,您可以使用操作员(他们在后台使用主题)为您完成此操作。
您在这里的许多歌舞都是以声明方式实现的distinctUntilChanged
。通过这样做,您创建了一个既慢得多(在这里应该无关紧要)又更难维护(在这里应该重要)的版本。
以下是我如何重构您的代码(使用 shareReplay
和 distinctUntilChanged
),使其更符合教条式的 RxJS。
interface Something {
length: number
}
class SomeService implements OnInit, OnDestroy {
/* Errors are "false", Data without a length is "undefined", and
everything else is "something". I wouldn't reccomend this,
but as an example, sure.
*/
private dataOb$: Observable<(Something | Boolean | undefined)[]>
private pollingSubscription: Subscription;
constructor(private readonly httpClient: HttpClient) {
}
ngOnInit() {
this.dataOb$ = timer(0,1000).pipe(
concatMap(() => this.getDataFromBackend()),
distinctUntilChanged(Lodash.isEqual),
shareReplay(1) // multicasting
)
// This service is effectively "using" itself. This means
// the polling continues even if nobody else is listening.
this.pollingSubscription = this.getDataObs().subscribe()
}
private getDataFromBackend(): Observable<(Something | Boolean | undefined)[]> {
// This is a bizzaar function, but I assume it's just as an example
return this.httpClient.get(url, options).pipe(
map((response: Something[]) => {
if (response?.length > 0) {
return response;
}
return undefined;
}),
catchError(() => of(false))
)
}
// I changed this from a private method
getDataObs(): Observable<(Something | Boolean | undefined)[]> {
return this.dataOb$
}
ngOnDestroy() {
this.pollingSubscription.unsubscribe();
}
}
顺便说一句:
Array<int>
与 int[]
和
相同
Array<int|boolean>
等同于 (<int|boolean>)[]
更新
如果您想(例如)忽略错误和空排放:
private getDataFromBackend(): Observable<Something[]> {
return this.httpClient.get<Something[]>(url, options).pipe(
filter(response => response?.length > 0),
catchError(() => EMPTY)
)
}
考虑一个包含对象数组的 BehaviorSubject。 该主题应该同时在多个地方订阅,即同一主题的多个订阅。 它通过轮询从后端接收数据。由于轮询相同的 api 会产生相同的数据,因此我使用 BehaviorSubject.value 将来自后端的新响应与 Subject 的现有数据进行比较,如下面的演示所示
https://stackblitz.com/edit/9ih9mv?file=index.ts
我的队友看到了 Ben Leah 对此话题的评论
根据@BenLesh 在此线程上的回答,我的团队强烈反对使用 .value 或 .getValue() 进行数据比较。 如果我有一个服务,其主题在加载 class 时被初始化,而在 class 被销毁(ngOnDestroy)时被取消订阅,那么在调用 .next() 之前使用 .value 比较数据是否有任何问题?
仅供参考,我尽量不使用 distinctUntilChanged,因为如果我有 1 个主题订阅多个订阅,它将触发多次比较(即每个订阅 1 次)。
您可以使用 pairwise() 运算符并比较两个值
文档:https://rxjs.dev/api/operators/pairwise
concatMap(() => this.getDataFromBackend()),
pairwise(),
switchMap(([oldResponse, newResponse]) => {
// ...logic
if (!Lodash.isEqual(oldResponse, newResponse)) {
this.subject.next(newResponse);
}
})
Ben Lesh 回答的症结在于,使用 .value
表明您没有充分利用 RxJS。
If you're using getValue() you're doing something imperative in declarative paradigm.
在较小程度上,一般情况下主题也是如此。它们通常用于两个目的之一。多播,或命令式代码和声明式代码之间的桥梁。
这里您只需要多播组件。在大多数情况下,您可以使用操作员(他们在后台使用主题)为您完成此操作。
您在这里的许多歌舞都是以声明方式实现的distinctUntilChanged
。通过这样做,您创建了一个既慢得多(在这里应该无关紧要)又更难维护(在这里应该重要)的版本。
以下是我如何重构您的代码(使用 shareReplay
和 distinctUntilChanged
),使其更符合教条式的 RxJS。
interface Something {
length: number
}
class SomeService implements OnInit, OnDestroy {
/* Errors are "false", Data without a length is "undefined", and
everything else is "something". I wouldn't reccomend this,
but as an example, sure.
*/
private dataOb$: Observable<(Something | Boolean | undefined)[]>
private pollingSubscription: Subscription;
constructor(private readonly httpClient: HttpClient) {
}
ngOnInit() {
this.dataOb$ = timer(0,1000).pipe(
concatMap(() => this.getDataFromBackend()),
distinctUntilChanged(Lodash.isEqual),
shareReplay(1) // multicasting
)
// This service is effectively "using" itself. This means
// the polling continues even if nobody else is listening.
this.pollingSubscription = this.getDataObs().subscribe()
}
private getDataFromBackend(): Observable<(Something | Boolean | undefined)[]> {
// This is a bizzaar function, but I assume it's just as an example
return this.httpClient.get(url, options).pipe(
map((response: Something[]) => {
if (response?.length > 0) {
return response;
}
return undefined;
}),
catchError(() => of(false))
)
}
// I changed this from a private method
getDataObs(): Observable<(Something | Boolean | undefined)[]> {
return this.dataOb$
}
ngOnDestroy() {
this.pollingSubscription.unsubscribe();
}
}
顺便说一句:
Array<int>
与 int[]
和
相同
Array<int|boolean>
等同于 (<int|boolean>)[]
更新
如果您想(例如)忽略错误和空排放:
private getDataFromBackend(): Observable<Something[]> {
return this.httpClient.get<Something[]>(url, options).pipe(
filter(response => response?.length > 0),
catchError(() => EMPTY)
)
}